This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 92273d469511d32c5291f8b856ed0f4fba98b5e6
Author: XiaoxiangYu <hit_la...@126.com>
AuthorDate: Fri Sep 20 16:57:46 2019 +0800

    KYLIN-4010 Auto adjust offset according to query server's timezone for time 
derived column
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   7 +
 .../kylin/dimension}/TimeDerivedColumnType.java    |   2 +-
 .../dimension}/TimeDerivedColumnTypeTest.java      |   2 +-
 .../kylin/storage/gtrecord/CubeTupleConverter.java |  32 +-
 .../optrule/visitor/TimezoneRewriteVisitor.java    | 151 +++++
 .../apache/kylin/query/relnode/OLAPFilterRel.java  |   5 +
 .../query/relnode/visitor/TupleFilterVisitor.java  |  24 +-
 .../core/query/StreamingDataQueryPlanner.java      |   2 +-
 .../stream/core/query/StreamingTupleConverter.java |  29 +-
 .../core/storage/StreamingSegmentManager.java      |  16 +-
 .../storage/columnar/ColumnarStoreDimDesc.java     |   2 +-
 .../storage/columnar/FragmentFileSearcher.java     | 718 ++++++++++-----------
 .../columnar/TimeDerivedColumnEncoding.java        |   2 +-
 .../core/util/CompareFilterTimeRangeChecker.java   |   1 +
 .../kylin/stream/server/StreamingServer.java       |   6 +-
 .../stream/source/kafka/TimedJsonStreamParser.java |   2 +-
 16 files changed, 620 insertions(+), 381 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 00afb58..9d6dc41 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2228,6 +2228,13 @@ public abstract class KylinConfigBase implements 
Serializable {
         return Long.parseLong(getOptional("kylin.stream.metrics.interval", 
"5"));
     }
 
+    /**
+     * whether realtime query should add timezone offset by kylin's 
web-timezone, please refer to KYLIN-4010 for detail
+     */
+    public boolean isStreamingAutoJustTimezone() {
+        return 
Boolean.parseBoolean(getOptional("kylin.stream.auto.just.by.timezone", 
"false"));
+    }
+
     // 
============================================================================
     // Health Check CLI
     // 
============================================================================
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/TimeDerivedColumnType.java
 
b/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java
similarity index 99%
rename from 
stream-core/src/main/java/org/apache/kylin/stream/core/util/TimeDerivedColumnType.java
rename to 
core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java
index 902caf4..01953e7 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/TimeDerivedColumnType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java
@@ -16,7 +16,7 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.stream.core.util;
+package org.apache.kylin.dimension;
 
 import java.util.Locale;
 import java.util.Map;
diff --git 
a/stream-core/src/test/java/org/apache/kylin/stream/core/util/TimeDerivedColumnTypeTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/dimension/TimeDerivedColumnTypeTest.java
similarity index 99%
rename from 
stream-core/src/test/java/org/apache/kylin/stream/core/util/TimeDerivedColumnTypeTest.java
rename to 
core-metadata/src/test/java/org/apache/kylin/dimension/TimeDerivedColumnTypeTest.java
index 414de0f..15f57af 100644
--- 
a/stream-core/src/test/java/org/apache/kylin/stream/core/util/TimeDerivedColumnTypeTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/dimension/TimeDerivedColumnTypeTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.stream.core.util;
+package org.apache.kylin.dimension;
 
 import org.apache.kylin.common.util.DateFormat;
 import org.junit.Test;
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 05fc90f..2e04daa 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -19,10 +19,12 @@
 package org.apache.kylin.storage.gtrecord;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.TimeZone;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Array;
@@ -32,6 +34,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -65,6 +68,11 @@ public class CubeTupleConverter implements ITupleConverter {
     public final List<Integer> advMeasureIndexInGTValues;
     private List<ILookupTable> usedLookupTables;
 
+    final Set<Integer> timestampColumn = new HashSet<>();
+    boolean autoJustByTimezone;
+    private static final long TIME_ZONE_OFFSET = 
TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
+            .getRawOffset();
+
     public final int nSelectedDims;
 
     public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
@@ -84,7 +92,12 @@ public class CubeTupleConverter implements ITupleConverter {
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
         usedLookupTables = Lists.newArrayList();
-
+        autoJustByTimezone = cubeSeg.getConfig().isStreamingAutoJustTimezone();
+        autoJustByTimezone = autoJustByTimezone
+                && 
cubeSeg.getCubeDesc().getModel().getRootFactTable().getTableDesc().isStreamingTable();
+        if (autoJustByTimezone) {
+            logger.debug("Will ajust dimsension for Time Derived Column.");
+        }
         ////////////
 
         int i = 0;
@@ -92,6 +105,8 @@ public class CubeTupleConverter implements ITupleConverter {
         // pre-calculate dimension index mapping to tuple
         for (TblColRef dim : selectedDimensions) {
             tupleIdx[i] = tupleInfo.hasColumn(dim) ? 
tupleInfo.getColumnIndex(dim) : -1;
+            if (dim.getType().isDateTimeFamily() && 
TimeDerivedColumnType.isTimeDerivedColumn(dim.getName()))
+                timestampColumn.add(tupleIdx[i]);
             i++;
         }
 
@@ -147,7 +162,20 @@ public class CubeTupleConverter implements ITupleConverter 
{
         for (int i = 0; i < nSelectedDims; i++) {
             int ti = tupleIdx[i];
             if (ti >= 0) {
-                tuple.setDimensionValue(ti, toString(gtValues[i]));
+                // add offset to return result according to timezone
+                if (autoJustByTimezone && timestampColumn.contains(i)) {
+                    try {
+                        String v = toString(gtValues[i]);
+                        if (v != null) {
+                            tuple.setDimensionValue(ti, 
Long.toString(Long.parseLong(v) + TIME_ZONE_OFFSET));
+                        }
+                    } catch (NumberFormatException nfe) {
+                        logger.warn("{} is not a long value.", gtValues[i]);
+                        tuple.setDimensionValue(ti, toString(gtValues[i]));
+                    }
+                } else {
+                    tuple.setDimensionValue(ti, toString(gtValues[i]));
+                }
             }
         }
 
diff --git 
a/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java
 
b/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java
new file mode 100644
index 0000000..cb75b34
--- /dev/null
+++ 
b/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.query.optrule.visitor;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexPatternFieldRef;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexTableInputRef;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TimezoneRewriteVisitor extends RexVisitorImpl<RexNode> {
+    public static final Logger logger = 
LoggerFactory.getLogger(TimezoneRewriteVisitor.class);
+
+    public TimezoneRewriteVisitor(boolean deep) {
+        super(deep);
+    }
+
+    @Override
+    public RexNode visitCall(RexCall call) {
+        List<RexNode> subList = new ArrayList<>();
+        SqlTypeName type = call.getType().getSqlTypeName();
+        if (call.getKind() == SqlKind.CAST) {
+            if (type.getFamily() == SqlTypeFamily.DATE || type.getFamily() == 
SqlTypeFamily.DATETIME
+                    || type.getFamily() == SqlTypeFamily.TIMESTAMP) {
+                for (RexNode node : call.getOperands()) {
+                    if (node instanceof RexLiteral) {
+                        RexLiteral literal = (RexLiteral) node;
+                        String toBeModify = literal.getValue2().toString();
+                        logger.info(toBeModify);
+                        if (toBeModify.length() != 19) {
+                            subList.add(node);
+                            continue;
+                        }
+                        // minus offset by timezone in RelNode level
+                        // this will affect code gen
+                        int minusHours = 0;
+                        String afetrModify = LocalDateTime
+                                .parse(toBeModify, 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).minusHours(minusHours)
+                                
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+                        logger.info("{}  ->  {}", toBeModify, afetrModify);
+                        RexLiteral newliteral = 
RexLiteral.fromJdbcString(literal.getType(), literal.getTypeName(),
+                                afetrModify);
+                        subList.add(newliteral);
+                    } else {
+                        subList.add(node);
+                    }
+                }
+            }
+            return call.clone(call.type, subList);
+        }
+
+        for (RexNode operand : call.operands) {
+            RexNode node = operand.accept(this);
+            subList.add(node);
+        }
+        return call.clone(call.type, subList);
+    }
+
+    @Override
+    public RexNode visitLiteral(RexLiteral literal) {
+        return literal;
+    }
+
+    @Override
+    public RexNode visitInputRef(RexInputRef inputRef) {
+        return inputRef;
+    }
+
+    @Override
+    public RexNode visitLocalRef(RexLocalRef localRef) {
+        return localRef;
+    }
+
+    @Override
+    public RexNode visitOver(RexOver over) {
+        return over;
+    }
+
+    @Override
+    public RexNode visitCorrelVariable(RexCorrelVariable correlVariable) {
+        return correlVariable;
+    }
+
+    @Override
+    public RexNode visitDynamicParam(RexDynamicParam dynamicParam) {
+        return dynamicParam;
+    }
+
+    @Override
+    public RexNode visitRangeRef(RexRangeRef rangeRef) {
+        return rangeRef;
+    }
+
+    @Override
+    public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
+        return fieldAccess;
+    }
+
+    @Override
+    public RexNode visitSubQuery(RexSubQuery subQuery) {
+        return subQuery;
+    }
+
+    @Override
+    public RexNode visitTableInputRef(RexTableInputRef ref) {
+        return ref;
+    }
+
+    @Override
+    public RexNode visitPatternFieldRef(RexPatternFieldRef fieldRef) {
+        return fieldRef;
+    }
+
+    public static void main(String[] args) {
+
+    }
+}
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java 
b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index b34b42e..3a76be6 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
@@ -54,6 +55,7 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
 
     ColumnRowType columnRowType;
     OLAPContext context;
+    boolean autoJustTimezone = 
KylinConfig.getInstanceFromEnv().isStreamingAutoJustTimezone();
 
     public OLAPFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode 
child, RexNode condition) {
         super(cluster, traits, child, condition);
@@ -105,6 +107,9 @@ public class OLAPFilterRel extends Filter implements 
OLAPRel {
         }
 
         TupleFilterVisitor visitor = new 
TupleFilterVisitor(this.columnRowType);
+        boolean isRealtimeTable = 
columnRowType.getColumnByIndex(0).getColumnDesc().getTable().isStreamingTable() 
;
+        autoJustTimezone = isRealtimeTable && autoJustTimezone;
+        visitor.setAutoJustByTimezone(autoJustTimezone);
         TupleFilter filter = this.condition.accept(visitor);
 
         // optimize the filter, the optimization has to be segment-irrelevant
diff --git 
a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
 
b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
index 7634023..3e07eca 100644
--- 
a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
+++ 
b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
@@ -37,6 +37,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.util.NlsString;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.filter.CaseTupleFilter;
@@ -57,11 +58,17 @@ import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 
 public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
 
     final ColumnRowType inputRowType;
 
+    // is the fact table is a streamingv2 table
+    private boolean autoJustByTimezone = false;
+    private static final long TIME_ZONE_OFFSET = 
TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
+            .getRawOffset();
+
     public TupleFilterVisitor(ColumnRowType inputRowType) {
         super(true);
         this.inputRowType = inputRowType;
@@ -210,10 +217,17 @@ public class TupleFilterVisitor extends 
RexVisitorImpl<TupleFilter> {
                 || type.getFamily() == SqlTypeFamily.TIMESTAMP) {
             List<String> newValues = Lists.newArrayList();
             for (Object v : constFilter.getValues()) {
-                if (v == null)
+                if (v == null) {
                     newValues.add(null);
-                else
-                    
newValues.add(String.valueOf(DateFormat.stringToMillis(v.toString())));
+                } else {
+                    long ts = DateFormat.stringToMillis(v.toString());
+                    //  minus offset by timezone in RelNode level
+                    // this will affect request sent to storage level
+                    if (autoJustByTimezone) {
+                        ts -= TIME_ZONE_OFFSET;
+                    }
+                    newValues.add(String.valueOf(ts));
+                }
             }
             constFilter = new ConstantTupleFilter(newValues);
         }
@@ -314,4 +328,8 @@ public class TupleFilterVisitor extends 
RexVisitorImpl<TupleFilter> {
         TupleFilter filter = new DynamicTupleFilter(name);
         return filter;
     }
+
+    public void setAutoJustByTimezone(boolean autoJustByTimezone) {
+        this.autoJustByTimezone = autoJustByTimezone;
+    }
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
index 2b1c286..98b4f4b 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
@@ -27,7 +27,7 @@ import 
org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker;
 import 
org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker.CheckResult;
-import org.apache.kylin.stream.core.util.TimeDerivedColumnType;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
 
 /**
  * Scan planner for Streaming data segments, take derived time columns into 
consideration.
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
index 4788b21..110cc8e 100755
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
@@ -18,8 +18,12 @@
 
 package org.apache.kylin.stream.core.query;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -27,14 +31,19 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.stream.core.storage.Record;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
 
 import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Convert Streaming Record to Tuple
- * 
  */
 public class StreamingTupleConverter {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamingTupleConverter.class);
+
     final TupleInfo tupleInfo;
 
     final int[] dimTupleIdx;
@@ -42,10 +51,13 @@ public class StreamingTupleConverter {
     final int dimCnt;
     final int metricsCnt;
     final MeasureType<?>[] measureTypes;
+    final Set<Integer> timestampColumn = new HashSet<>();
 
     final List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
     final List<Integer> advMeasureIndexInGTValues;
-
+    final boolean autoTimezone = 
KylinConfig.getInstanceFromEnv().isStreamingAutoJustTimezone();
+    private static final long TIME_ZONE_OFFSET = 
TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
+            .getRawOffset();
 
     public StreamingTupleConverter(ResponseResultSchema schema, TupleInfo 
returnTupleInfo) {
         this.tupleInfo = returnTupleInfo;
@@ -64,6 +76,8 @@ public class StreamingTupleConverter {
         // pre-calculate dimension index mapping to tuple
         for (TblColRef dim : schema.getDimensions()) {
             dimTupleIdx[idx] = tupleInfo.hasColumn(dim) ? 
tupleInfo.getColumnIndex(dim) : -1;
+            if (dim.getType().isDateTimeFamily() && 
TimeDerivedColumnType.isTimeDerivedColumn(dim.getName()))
+                timestampColumn.add(dimTupleIdx[idx]);
             idx++;
         }
 
@@ -95,7 +109,16 @@ public class StreamingTupleConverter {
         for (int i = 0; i < dimCnt; i++) {
             int ti = dimTupleIdx[i];
             if (ti >= 0) {
-                tuple.setDimensionValue(ti, dimValues[i]);
+                if (autoTimezone && timestampColumn.contains(ti)) {
+                    try {
+                        tuple.setDimensionValue(ti, 
Long.toString(Long.parseLong(dimValues[i]) + TIME_ZONE_OFFSET));
+                    } catch (NumberFormatException nfe) {
+                        logger.warn("{} is not a long value.", dimValues[i]);
+                        tuple.setDimensionValue(ti, dimValues[i]);
+                    }
+                } else {
+                    tuple.setDimensionValue(ti, dimValues[i]);
+                }
             }
         }
 
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index 9276d27..fa9b9ab 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -29,11 +29,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TimeZone;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
@@ -93,8 +93,8 @@ public class StreamingSegmentManager implements Closeable {
 
     private final ISourcePositionHandler sourcePositionHandler;
     private final ISourcePosition consumePosition;
-
-    //TODO long latency event may not be dropped directly
+    private static final long TIME_ZONE_OFFSET = 
TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
+            .getRawOffset();
     private volatile LongLatencyInfo longLatencyInfo;
     private volatile long nextCheckPoint = 0;
     private volatile long lastCheckPointCount = 0;
@@ -543,9 +543,9 @@ public class StreamingSegmentManager implements Closeable {
         for (StreamingCubeSegment segment : allSegments) {
             SegmentStats segmentStats = new SegmentStats();
             segmentStats.setSegmentState(segment.getState().name());
-            segmentStats.setSegmentCreateTime(segment.getCreateTime());
-            segmentStats.setSegmentLastUpdateTime(segment.getLastUpdateTime());
-            segmentStats.setLatestEventTime(segment.getLatestEventTimeStamp());
+            
segmentStats.setSegmentCreateTime(resetTimestampByTimeZone(segment.getCreateTime()));
+            
segmentStats.setSegmentLastUpdateTime(resetTimestampByTimeZone(segment.getLastUpdateTime()));
+            
segmentStats.setLatestEventTime(resetTimestampByTimeZone(segment.getLatestEventTimeStamp()));
             segmentStats.setLatestEventLatency(segment.getLatestEventLatecy());
             SegmentStoreStats storeStats = 
segment.getSegmentStore().getStoreStats();
             segmentStats.setStoreStats(storeStats);
@@ -623,4 +623,8 @@ public class StreamingSegmentManager implements Closeable {
     private long truncateTime(long timestamp, long windowLength) {
         return (timestamp / windowLength) * windowLength;
     }
+
+    public static long resetTimestampByTimeZone(long ts) {
+        return ts + TIME_ZONE_OFFSET;
+    }
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
index b3d1f80..0406ecb 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java
@@ -35,7 +35,7 @@ import 
org.apache.kylin.stream.core.storage.columnar.compress.NoCompressedColumn
 import 
org.apache.kylin.stream.core.storage.columnar.compress.NoCompressedColumnWriter;
 import 
org.apache.kylin.stream.core.storage.columnar.compress.RunLengthCompressedColumnReader;
 import 
org.apache.kylin.stream.core.storage.columnar.compress.RunLengthCompressedColumnWriter;
-import org.apache.kylin.stream.core.util.TimeDerivedColumnType;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
 
 public class ColumnarStoreDimDesc {
     private int fixLen;
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
index 20b123a..9b0a912 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
@@ -1,360 +1,360 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.stream.core.storage.columnar;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.kv.CubeDimEncMap;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.BuiltInFunctionTransformer;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.dimension.IDimensionEncodingMap;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
-import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.StringCodeSystem;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.filter.TupleFilterSerializer;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.stream.core.query.IStreamingGTSearcher;
-import org.apache.kylin.stream.core.query.IStreamingSearchResult;
-import org.apache.kylin.stream.core.query.ResponseResultSchema;
-import org.apache.kylin.stream.core.query.ResultCollector;
-import org.apache.kylin.stream.core.query.StreamingSearchContext;
-import org.apache.kylin.stream.core.storage.columnar.protocol.CuboidMetaInfo;
-import org.apache.kylin.stream.core.storage.columnar.protocol.FragmentMetaInfo;
-import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker;
-import 
org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker.CheckResult;
-import org.apache.kylin.stream.core.util.TimeDerivedColumnType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * FragmentFileSearcher is responsible to scan the columnar based storage on 
disk and deal with all the bytes level details for each DataFragment and return 
the result as GTRecords.
- * 
- */
-public class FragmentFileSearcher implements IStreamingGTSearcher {
-    private static Logger logger = 
LoggerFactory.getLogger(FragmentFileSearcher.class);
-
-    private FragmentData fragmentData;
-    private DataSegmentFragment fragment;
-
-    public FragmentFileSearcher(DataSegmentFragment fragment, FragmentData 
fragmentData) {
-        this.fragment = fragment;
-        this.fragmentData = fragmentData;
-    }
-
-    @Override
-    public void search(StreamingSearchContext searchContext, ResultCollector 
collector) throws IOException {
-        FragmentMetaInfo fragmentMetaInfo = fragmentData.getFragmentMetaInfo();
-        CuboidMetaInfo cuboidMetaInfo;
-        if (searchContext.hitBasicCuboid()) {
-            cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo();
-        } else {
-            cuboidMetaInfo = 
fragmentMetaInfo.getCuboidMetaInfo(searchContext.getHitCuboid());
-            if (cuboidMetaInfo == null) {
-                logger.warn("the cuboid:{} is not exist in the fragment:{}, 
use basic cuboid instead",
-                        searchContext.getHitCuboid(), 
fragment.getFragmentId());
-                cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo();
-            }
-        }
-
-        ResponseResultSchema responseSchema = 
searchContext.getRespResultSchema();
-        TblColRef[] dimensions = responseSchema.getDimensions();
-        FunctionDesc[] metrics = responseSchema.getMetrics();
-        Map<TblColRef, Dictionary<String>> dictMap = 
fragmentData.getDimensionDictionaries(dimensions);
-
-        CubeDesc cubeDesc = responseSchema.getCubeDesc();
-        List<MeasureDesc> allMeasures = cubeDesc.getMeasures();
-        Map<FunctionDesc, MeasureDesc> funcMeasureMap = Maps.newHashMap();
-        for (MeasureDesc measure : allMeasures) {
-            funcMeasureMap.put(measure.getFunction(), measure);
-        }
-        MeasureDesc[] measures = new MeasureDesc[metrics.length];
-        for (int i = 0; i < measures.length; i++) {
-            measures[i] = funcMeasureMap.get(metrics[i]);
-        }
-        DimensionEncoding[] dimensionEncodings = 
ParsedStreamingCubeInfo.getDimensionEncodings(cubeDesc, dimensions,
-                dictMap);
-        ColumnarMetricsEncoding[] metricsEncodings = 
ParsedStreamingCubeInfo.getMetricsEncodings(measures);
-        ColumnarRecordCodec recordCodec = new 
ColumnarRecordCodec(dimensionEncodings, metricsEncodings);
-
-        // change the unEvaluable dimensions to groupBy
-        Set<TblColRef> unEvaluateDims = Sets.newHashSet();
-        TupleFilter fragmentFilter = null;
-        if (searchContext.getFilter() != null) {
-            fragmentFilter = convertFilter(fragmentMetaInfo, 
searchContext.getFilter(), recordCodec,
-                    dimensions, new CubeDimEncMap(cubeDesc, dictMap), 
unEvaluateDims);
-        }
-        if (ConstantTupleFilter.TRUE == fragmentFilter) {
-            fragmentFilter = null;
-        } else if (ConstantTupleFilter.FALSE == fragmentFilter) {
-            collector.collectSearchResult(IStreamingSearchResult.EMPTY_RESULT);
-        }
-        Set<TblColRef> groups = searchContext.getGroups();
-        if (!unEvaluateDims.isEmpty()) {
-            searchContext.addNewGroups(unEvaluateDims);
-            groups = Sets.union(groups, unEvaluateDims);
-        }
-        collector.collectSearchResult(new FragmentSearchResult(fragment, 
fragmentData, cuboidMetaInfo, responseSchema, fragmentFilter, groups, 
searchContext.getHavingFilter(),
-                recordCodec));
-    }
-
-    private TupleFilter convertFilter(FragmentMetaInfo fragmentMetaInfo, 
TupleFilter rootFilter,
-                                      ColumnarRecordCodec recordCodec, final 
TblColRef[] dimensions, final IDimensionEncodingMap dimEncodingMap, //
-                                      final Set<TblColRef> 
unEvaluableColumnCollector) {
-        Map<TblColRef, Integer> colMapping = Maps.newHashMap();
-        for (int i = 0; i < dimensions.length; i++) {
-            colMapping.put(dimensions[i], i);
-        }
-        byte[] bytes = TupleFilterSerializer.serialize(rootFilter, null, 
StringCodeSystem.INSTANCE);
-        TupleFilter filter = TupleFilterSerializer.deserialize(bytes, 
StringCodeSystem.INSTANCE);
-
-        BuiltInFunctionTransformer builtInFunctionTransformer = new 
BuiltInFunctionTransformer(dimEncodingMap);
-        filter = builtInFunctionTransformer.transform(filter);
-        FragmentFilterConverter fragmentFilterConverter = new 
FragmentFilterConverter(fragmentMetaInfo, unEvaluableColumnCollector,
-                colMapping, recordCodec);
-        filter = fragmentFilterConverter.transform(filter);
-
-        filter = new FilterOptimizeTransformer().transform(filter);
-        return filter;
-    }
-
-    protected static class FragmentFilterConverter implements 
ITupleFilterTransformer {
-        protected final Set<TblColRef> unEvaluableColumnCollector;
-        protected final Map<TblColRef, Integer> colMapping;
-        private CompareFilterTimeRangeChecker filterTimeRangeChecker;
-        private ColumnarRecordCodec recordCodec;
-        transient ByteBuffer buf;
-
-        public FragmentFilterConverter(FragmentMetaInfo fragmentMetaInfo, 
Set<TblColRef> unEvaluableColumnCollector,
-                                       Map<TblColRef, Integer> colMapping, 
ColumnarRecordCodec recordCodec) {
-            this.unEvaluableColumnCollector = unEvaluableColumnCollector;
-            this.recordCodec = recordCodec;
-            this.colMapping = colMapping;
-            if (fragmentMetaInfo.hasValidEventTimeRange()) {
-                this.filterTimeRangeChecker = new 
CompareFilterTimeRangeChecker(fragmentMetaInfo.getMinEventTime(),
-                        fragmentMetaInfo.getMaxEventTime(), true);
-            }
-            buf = ByteBuffer.allocate(recordCodec.getMaxDimLength());
-        }
-
-        protected int mapCol(TblColRef col) {
-            Integer i = colMapping.get(col);
-            return i == null ? -1 : i;
-        }
-
-        @Override
-        public TupleFilter transform(TupleFilter filter) {
-            if (filter.getOperator() == TupleFilter.FilterOperatorEnum.NOT
-                    && !TupleFilter.isEvaluableRecursively(filter)) {
-                TupleFilter.collectColumns(filter, unEvaluableColumnCollector);
-                return ConstantTupleFilter.TRUE;
-            }
-
-            // shortcut for unEvaluatable filter
-            if (!filter.isEvaluable()) {
-                TupleFilter.collectColumns(filter, unEvaluableColumnCollector);
-                return ConstantTupleFilter.TRUE;
-            }
-
-            if (filter instanceof CompareTupleFilter) {
-                return translateCompareFilter((CompareTupleFilter) filter);
-            } else if (filter instanceof LogicalTupleFilter) {
-                @SuppressWarnings("unchecked")
-                ListIterator<TupleFilter> childIterator = 
(ListIterator<TupleFilter>) filter.getChildren().listIterator();
-                while (childIterator.hasNext()) {
-                    TupleFilter transformed = transform(childIterator.next());
-                    if (transformed != null) {
-                        childIterator.set(transformed);
-                    } else {
-                        throw new IllegalStateException("Should not be null");
-                    }
-                }
-            }
-            return filter;
-        }
-
-
-        @SuppressWarnings({ "rawtypes", "unchecked" })
-        protected TupleFilter translateCompareFilter(CompareTupleFilter 
oldCompareFilter) {
-            // extract ColumnFilter & ConstantFilter
-            TblColRef externalCol = oldCompareFilter.getColumn();
-
-            if (externalCol == null) {
-                return oldCompareFilter;
-            }
-
-            Collection constValues = oldCompareFilter.getValues();
-            if (constValues == null || constValues.isEmpty()) {
-                return oldCompareFilter;
-            }
-
-            if 
(TimeDerivedColumnType.isTimeDerivedColumn(externalCol.getName()) && 
filterTimeRangeChecker != null) {
-                CheckResult checkResult = 
filterTimeRangeChecker.check(oldCompareFilter,
-                        
TimeDerivedColumnType.getTimeDerivedColumnType(externalCol.getName()));
-                if (checkResult == CheckResult.INCLUDED) {
-                    return ConstantTupleFilter.TRUE;
-                } else if (checkResult == CheckResult.EXCLUDED) {
-                    return ConstantTupleFilter.FALSE;
-                }
-            }
-
-            //CompareTupleFilter containing BuiltInFunctionTupleFilter will 
not reach here caz it will be transformed by BuiltInFunctionTransformer
-            CompareTupleFilter newCompareFilter = new 
CompareTupleFilter(oldCompareFilter.getOperator());
-            newCompareFilter.addChild(new ColumnTupleFilter(externalCol));
-
-            //for CompareTupleFilter containing dynamicVariables, the below 
codes will actually replace dynamicVariables
-            //with normal ConstantTupleFilter
-
-            Object firstValue = constValues.iterator().next();
-            int col = mapCol(externalCol);
-
-            TupleFilter result;
-            ByteArray code;
-
-            // translate constant into code
-            switch (newCompareFilter.getOperator()) {
-            case EQ:
-            case IN:
-                Set newValues = Sets.newHashSet();
-                for (Object value : constValues) {
-                    code = translate(col, value, 0);
-                    if (code != null)
-                        newValues.add(code);
-                }
-                if (newValues.isEmpty()) {
-                    result = ConstantTupleFilter.FALSE;
-                } else {
-                    newCompareFilter.addChild(new 
ConstantTupleFilter(newValues));
-                    result = newCompareFilter;
-                }
-                break;
-            case NOTIN:
-                Set notInValues = Sets.newHashSet();
-                for (Object value : constValues) {
-                    code = translate(col, value, 0);
-                    if (code != null)
-                        notInValues.add(code);
-                }
-                if (notInValues.isEmpty()) {
-                    result = ConstantTupleFilter.TRUE;
-                } else {
-                    newCompareFilter.addChild(new 
ConstantTupleFilter(notInValues));
-                    result = newCompareFilter;
-                }
-                break;
-            case NEQ:
-                code = translate(col, firstValue, 0);
-                if (code == null) {
-                    result = ConstantTupleFilter.TRUE;
-                } else {
-                    newCompareFilter.addChild(new ConstantTupleFilter(code));
-                    result = newCompareFilter;
-                }
-                break;
-            case LT:
-                code = translate(col, firstValue, 0);
-                if (code == null) {
-                    code = translate(col, firstValue, -1);
-                    if (code == null)
-                        result = ConstantTupleFilter.FALSE;
-                    else
-                        result = newCompareFilter(FilterOperatorEnum.LTE, 
externalCol, code);
-                } else {
-                    newCompareFilter.addChild(new ConstantTupleFilter(code));
-                    result = newCompareFilter;
-                }
-                break;
-            case LTE:
-                code = translate(col, firstValue, -1);
-                if (code == null) {
-                    result = ConstantTupleFilter.FALSE;
-                } else {
-                    newCompareFilter.addChild(new ConstantTupleFilter(code));
-                    result = newCompareFilter;
-                }
-                break;
-            case GT:
-                code = translate(col, firstValue, 0);
-                if (code == null) {
-                    code = translate(col, firstValue, 1);
-                    if (code == null)
-                        result = ConstantTupleFilter.FALSE;
-                    else
-                        result = newCompareFilter(FilterOperatorEnum.GTE, 
externalCol, code);
-                } else {
-                    newCompareFilter.addChild(new ConstantTupleFilter(code));
-                    result = newCompareFilter;
-                }
-                break;
-            case GTE:
-                code = translate(col, firstValue, 1);
-                if (code == null) {
-                    result = ConstantTupleFilter.FALSE;
-                } else {
-                    newCompareFilter.addChild(new ConstantTupleFilter(code));
-                    result = newCompareFilter;
-                }
-                break;
-            default:
-                throw new IllegalStateException("Cannot handle operator " + 
newCompareFilter.getOperator());
-            }
-            return result;
-        }
-
-        private TupleFilter newCompareFilter(FilterOperatorEnum op, TblColRef 
col, ByteArray code) {
-            CompareTupleFilter r = new CompareTupleFilter(op);
-            r.addChild(new ColumnTupleFilter(col));
-            r.addChild(new ConstantTupleFilter(code));
-            return r;
-        }
-
-        protected ByteArray translate(int col, Object value, int roundingFlag) 
{
-            try {
-                buf.clear();
-                recordCodec.encodeDimension(col, value, roundingFlag, buf);
-                int length = buf.position();
-                return ByteArray.copyOf(buf.array(), 0, length);
-            } catch (IllegalArgumentException ex) {
-                return null;
-            }
-        }
-
-
-    }
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.stream.core.storage.columnar;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.BuiltInFunctionTransformer;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.StringCodeSystem;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.filter.TupleFilterSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.stream.core.query.IStreamingGTSearcher;
+import org.apache.kylin.stream.core.query.IStreamingSearchResult;
+import org.apache.kylin.stream.core.query.ResponseResultSchema;
+import org.apache.kylin.stream.core.query.ResultCollector;
+import org.apache.kylin.stream.core.query.StreamingSearchContext;
+import org.apache.kylin.stream.core.storage.columnar.protocol.CuboidMetaInfo;
+import org.apache.kylin.stream.core.storage.columnar.protocol.FragmentMetaInfo;
+import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker;
+import 
org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker.CheckResult;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * FragmentFileSearcher is responsible to scan the columnar based storage on 
disk and deal with all the bytes level details for each DataFragment and return 
the result as GTRecords.
+ * 
+ */
+public class FragmentFileSearcher implements IStreamingGTSearcher {
+    private static Logger logger = 
LoggerFactory.getLogger(FragmentFileSearcher.class);
+
+    private FragmentData fragmentData;
+    private DataSegmentFragment fragment;
+
+    public FragmentFileSearcher(DataSegmentFragment fragment, FragmentData 
fragmentData) {
+        this.fragment = fragment;
+        this.fragmentData = fragmentData;
+    }
+
+    @Override
+    public void search(StreamingSearchContext searchContext, ResultCollector 
collector) throws IOException {
+        FragmentMetaInfo fragmentMetaInfo = fragmentData.getFragmentMetaInfo();
+        CuboidMetaInfo cuboidMetaInfo;
+        if (searchContext.hitBasicCuboid()) {
+            cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo();
+        } else {
+            cuboidMetaInfo = 
fragmentMetaInfo.getCuboidMetaInfo(searchContext.getHitCuboid());
+            if (cuboidMetaInfo == null) {
+                logger.warn("the cuboid:{} is not exist in the fragment:{}, 
use basic cuboid instead",
+                        searchContext.getHitCuboid(), 
fragment.getFragmentId());
+                cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo();
+            }
+        }
+
+        ResponseResultSchema responseSchema = 
searchContext.getRespResultSchema();
+        TblColRef[] dimensions = responseSchema.getDimensions();
+        FunctionDesc[] metrics = responseSchema.getMetrics();
+        Map<TblColRef, Dictionary<String>> dictMap = 
fragmentData.getDimensionDictionaries(dimensions);
+
+        CubeDesc cubeDesc = responseSchema.getCubeDesc();
+        List<MeasureDesc> allMeasures = cubeDesc.getMeasures();
+        Map<FunctionDesc, MeasureDesc> funcMeasureMap = Maps.newHashMap();
+        for (MeasureDesc measure : allMeasures) {
+            funcMeasureMap.put(measure.getFunction(), measure);
+        }
+        MeasureDesc[] measures = new MeasureDesc[metrics.length];
+        for (int i = 0; i < measures.length; i++) {
+            measures[i] = funcMeasureMap.get(metrics[i]);
+        }
+        DimensionEncoding[] dimensionEncodings = 
ParsedStreamingCubeInfo.getDimensionEncodings(cubeDesc, dimensions,
+                dictMap);
+        ColumnarMetricsEncoding[] metricsEncodings = 
ParsedStreamingCubeInfo.getMetricsEncodings(measures);
+        ColumnarRecordCodec recordCodec = new 
ColumnarRecordCodec(dimensionEncodings, metricsEncodings);
+
+        // change the unEvaluable dimensions to groupBy
+        Set<TblColRef> unEvaluateDims = Sets.newHashSet();
+        TupleFilter fragmentFilter = null;
+        if (searchContext.getFilter() != null) {
+            fragmentFilter = convertFilter(fragmentMetaInfo, 
searchContext.getFilter(), recordCodec,
+                    dimensions, new CubeDimEncMap(cubeDesc, dictMap), 
unEvaluateDims);
+        }
+        if (ConstantTupleFilter.TRUE == fragmentFilter) {
+            fragmentFilter = null;
+        } else if (ConstantTupleFilter.FALSE == fragmentFilter) {
+            collector.collectSearchResult(IStreamingSearchResult.EMPTY_RESULT);
+        }
+        Set<TblColRef> groups = searchContext.getGroups();
+        if (!unEvaluateDims.isEmpty()) {
+            searchContext.addNewGroups(unEvaluateDims);
+            groups = Sets.union(groups, unEvaluateDims);
+        }
+        collector.collectSearchResult(new FragmentSearchResult(fragment, 
fragmentData, cuboidMetaInfo, responseSchema, fragmentFilter, groups, 
searchContext.getHavingFilter(),
+                recordCodec));
+    }
+
+    private TupleFilter convertFilter(FragmentMetaInfo fragmentMetaInfo, 
TupleFilter rootFilter,
+                                      ColumnarRecordCodec recordCodec, final 
TblColRef[] dimensions, final IDimensionEncodingMap dimEncodingMap, //
+                                      final Set<TblColRef> 
unEvaluableColumnCollector) {
+        Map<TblColRef, Integer> colMapping = Maps.newHashMap();
+        for (int i = 0; i < dimensions.length; i++) {
+            colMapping.put(dimensions[i], i);
+        }
+        byte[] bytes = TupleFilterSerializer.serialize(rootFilter, null, 
StringCodeSystem.INSTANCE);
+        TupleFilter filter = TupleFilterSerializer.deserialize(bytes, 
StringCodeSystem.INSTANCE);
+
+        BuiltInFunctionTransformer builtInFunctionTransformer = new 
BuiltInFunctionTransformer(dimEncodingMap);
+        filter = builtInFunctionTransformer.transform(filter);
+        FragmentFilterConverter fragmentFilterConverter = new 
FragmentFilterConverter(fragmentMetaInfo, unEvaluableColumnCollector,
+                colMapping, recordCodec);
+        filter = fragmentFilterConverter.transform(filter);
+
+        filter = new FilterOptimizeTransformer().transform(filter);
+        return filter;
+    }
+
+    protected static class FragmentFilterConverter implements 
ITupleFilterTransformer {
+        protected final Set<TblColRef> unEvaluableColumnCollector;
+        protected final Map<TblColRef, Integer> colMapping;
+        private CompareFilterTimeRangeChecker filterTimeRangeChecker;
+        private ColumnarRecordCodec recordCodec;
+        transient ByteBuffer buf;
+
+        public FragmentFilterConverter(FragmentMetaInfo fragmentMetaInfo, 
Set<TblColRef> unEvaluableColumnCollector,
+                                       Map<TblColRef, Integer> colMapping, 
ColumnarRecordCodec recordCodec) {
+            this.unEvaluableColumnCollector = unEvaluableColumnCollector;
+            this.recordCodec = recordCodec;
+            this.colMapping = colMapping;
+            if (fragmentMetaInfo.hasValidEventTimeRange()) {
+                this.filterTimeRangeChecker = new 
CompareFilterTimeRangeChecker(fragmentMetaInfo.getMinEventTime(),
+                        fragmentMetaInfo.getMaxEventTime(), true);
+            }
+            buf = ByteBuffer.allocate(recordCodec.getMaxDimLength());
+        }
+
+        protected int mapCol(TblColRef col) {
+            Integer i = colMapping.get(col);
+            return i == null ? -1 : i;
+        }
+
+        @Override
+        public TupleFilter transform(TupleFilter filter) {
+            if (filter.getOperator() == TupleFilter.FilterOperatorEnum.NOT
+                    && !TupleFilter.isEvaluableRecursively(filter)) {
+                TupleFilter.collectColumns(filter, unEvaluableColumnCollector);
+                return ConstantTupleFilter.TRUE;
+            }
+
+            // shortcut for unEvaluatable filter
+            if (!filter.isEvaluable()) {
+                TupleFilter.collectColumns(filter, unEvaluableColumnCollector);
+                return ConstantTupleFilter.TRUE;
+            }
+
+            if (filter instanceof CompareTupleFilter) {
+                return translateCompareFilter((CompareTupleFilter) filter);
+            } else if (filter instanceof LogicalTupleFilter) {
+                @SuppressWarnings("unchecked")
+                ListIterator<TupleFilter> childIterator = 
(ListIterator<TupleFilter>) filter.getChildren().listIterator();
+                while (childIterator.hasNext()) {
+                    TupleFilter transformed = transform(childIterator.next());
+                    if (transformed != null) {
+                        childIterator.set(transformed);
+                    } else {
+                        throw new IllegalStateException("Should not be null");
+                    }
+                }
+            }
+            return filter;
+        }
+
+
+        @SuppressWarnings({ "rawtypes", "unchecked" })
+        protected TupleFilter translateCompareFilter(CompareTupleFilter 
oldCompareFilter) {
+            // extract ColumnFilter & ConstantFilter
+            TblColRef externalCol = oldCompareFilter.getColumn();
+
+            if (externalCol == null) {
+                return oldCompareFilter;
+            }
+
+            Collection constValues = oldCompareFilter.getValues();
+            if (constValues == null || constValues.isEmpty()) {
+                return oldCompareFilter;
+            }
+
+            if 
(TimeDerivedColumnType.isTimeDerivedColumn(externalCol.getName()) && 
filterTimeRangeChecker != null) {
+                CheckResult checkResult = 
filterTimeRangeChecker.check(oldCompareFilter,
+                        
TimeDerivedColumnType.getTimeDerivedColumnType(externalCol.getName()));
+                if (checkResult == CheckResult.INCLUDED) {
+                    return ConstantTupleFilter.TRUE;
+                } else if (checkResult == CheckResult.EXCLUDED) {
+                    return ConstantTupleFilter.FALSE;
+                }
+            }
+
+            //CompareTupleFilter containing BuiltInFunctionTupleFilter will 
not reach here caz it will be transformed by BuiltInFunctionTransformer
+            CompareTupleFilter newCompareFilter = new 
CompareTupleFilter(oldCompareFilter.getOperator());
+            newCompareFilter.addChild(new ColumnTupleFilter(externalCol));
+
+            //for CompareTupleFilter containing dynamicVariables, the below 
codes will actually replace dynamicVariables
+            //with normal ConstantTupleFilter
+
+            Object firstValue = constValues.iterator().next();
+            int col = mapCol(externalCol);
+
+            TupleFilter result;
+            ByteArray code;
+
+            // translate constant into code
+            switch (newCompareFilter.getOperator()) {
+            case EQ:
+            case IN:
+                Set newValues = Sets.newHashSet();
+                for (Object value : constValues) {
+                    code = translate(col, value, 0);
+                    if (code != null)
+                        newValues.add(code);
+                }
+                if (newValues.isEmpty()) {
+                    result = ConstantTupleFilter.FALSE;
+                } else {
+                    newCompareFilter.addChild(new 
ConstantTupleFilter(newValues));
+                    result = newCompareFilter;
+                }
+                break;
+            case NOTIN:
+                Set notInValues = Sets.newHashSet();
+                for (Object value : constValues) {
+                    code = translate(col, value, 0);
+                    if (code != null)
+                        notInValues.add(code);
+                }
+                if (notInValues.isEmpty()) {
+                    result = ConstantTupleFilter.TRUE;
+                } else {
+                    newCompareFilter.addChild(new 
ConstantTupleFilter(notInValues));
+                    result = newCompareFilter;
+                }
+                break;
+            case NEQ:
+                code = translate(col, firstValue, 0);
+                if (code == null) {
+                    result = ConstantTupleFilter.TRUE;
+                } else {
+                    newCompareFilter.addChild(new ConstantTupleFilter(code));
+                    result = newCompareFilter;
+                }
+                break;
+            case LT:
+                code = translate(col, firstValue, 0);
+                if (code == null) {
+                    code = translate(col, firstValue, -1);
+                    if (code == null)
+                        result = ConstantTupleFilter.FALSE;
+                    else
+                        result = newCompareFilter(FilterOperatorEnum.LTE, 
externalCol, code);
+                } else {
+                    newCompareFilter.addChild(new ConstantTupleFilter(code));
+                    result = newCompareFilter;
+                }
+                break;
+            case LTE:
+                code = translate(col, firstValue, -1);
+                if (code == null) {
+                    result = ConstantTupleFilter.FALSE;
+                } else {
+                    newCompareFilter.addChild(new ConstantTupleFilter(code));
+                    result = newCompareFilter;
+                }
+                break;
+            case GT:
+                code = translate(col, firstValue, 0);
+                if (code == null) {
+                    code = translate(col, firstValue, 1);
+                    if (code == null)
+                        result = ConstantTupleFilter.FALSE;
+                    else
+                        result = newCompareFilter(FilterOperatorEnum.GTE, 
externalCol, code);
+                } else {
+                    newCompareFilter.addChild(new ConstantTupleFilter(code));
+                    result = newCompareFilter;
+                }
+                break;
+            case GTE:
+                code = translate(col, firstValue, 1);
+                if (code == null) {
+                    result = ConstantTupleFilter.FALSE;
+                } else {
+                    newCompareFilter.addChild(new ConstantTupleFilter(code));
+                    result = newCompareFilter;
+                }
+                break;
+            default:
+                throw new IllegalStateException("Cannot handle operator " + 
newCompareFilter.getOperator());
+            }
+            return result;
+        }
+
+        private TupleFilter newCompareFilter(FilterOperatorEnum op, TblColRef 
col, ByteArray code) {
+            CompareTupleFilter r = new CompareTupleFilter(op);
+            r.addChild(new ColumnTupleFilter(col));
+            r.addChild(new ConstantTupleFilter(code));
+            return r;
+        }
+
+        protected ByteArray translate(int col, Object value, int roundingFlag) 
{
+            try {
+                buf.clear();
+                recordCodec.encodeDimension(col, value, roundingFlag, buf);
+                int length = buf.position();
+                return ByteArray.copyOf(buf.array(), 0, length);
+            } catch (IllegalArgumentException ex) {
+                return null;
+            }
+        }
+
+
+    }
+
 }
\ No newline at end of file
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/TimeDerivedColumnEncoding.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/TimeDerivedColumnEncoding.java
index c342c04..22025ff 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/TimeDerivedColumnEncoding.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/TimeDerivedColumnEncoding.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.stream.core.storage.columnar;
 
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.stream.core.util.TimeDerivedColumnType;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
 
 public class TimeDerivedColumnEncoding {
     private String columnName;
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
index 5be5fd1..c01459d 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Set;
 
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 
 /**
diff --git 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 2a3b077..408a5e7a 100644
--- 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -477,8 +477,10 @@ public class StreamingServer implements 
ReplicaSetLeaderSelector.LeaderChangeLis
             Map<String, SegmentStats> segmentStatsMap = 
segmentManager.getSegmentStats();
             receiverCubeStats.setSegmentStatsMap(segmentStatsMap);
             receiverCubeStats.setTotalIngest(segmentManager.getIngestCount());
-            
receiverCubeStats.setLatestEventTime(segmentManager.getLatestEventTime());
-            
receiverCubeStats.setLatestEventIngestTime(segmentManager.getLatestEventIngestTime());
+            receiverCubeStats.setLatestEventTime(
+                    
StreamingSegmentManager.resetTimestampByTimeZone(segmentManager.getLatestEventTime()));
+            receiverCubeStats.setLatestEventIngestTime(
+                    
StreamingSegmentManager.resetTimestampByTimeZone(segmentManager.getLatestEventIngestTime()));
             
receiverCubeStats.setLongLatencyInfo(segmentManager.getLongLatencyInfo());
         }
         return receiverCubeStats;
diff --git 
a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
 
b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
index 32e4111..809844c 100644
--- 
a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
+++ 
b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
@@ -40,7 +40,7 @@ import 
org.apache.kylin.stream.core.exception.StreamingException;
 import org.apache.kylin.stream.core.model.StreamingMessage;
 import org.apache.kylin.stream.core.source.IStreamingMessageParser;
 import org.apache.kylin.stream.core.source.MessageParserInfo;
-import org.apache.kylin.stream.core.util.TimeDerivedColumnType;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
 import 
org.apache.kylin.stream.source.kafka.KafkaPosition.KafkaPartitionPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Reply via email to