This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 92273d469511d32c5291f8b856ed0f4fba98b5e6 Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Fri Sep 20 16:57:46 2019 +0800 KYLIN-4010 Auto adjust offset according to query server's timezone for time derived column --- .../org/apache/kylin/common/KylinConfigBase.java | 7 + .../kylin/dimension}/TimeDerivedColumnType.java | 2 +- .../dimension}/TimeDerivedColumnTypeTest.java | 2 +- .../kylin/storage/gtrecord/CubeTupleConverter.java | 32 +- .../optrule/visitor/TimezoneRewriteVisitor.java | 151 +++++ .../apache/kylin/query/relnode/OLAPFilterRel.java | 5 + .../query/relnode/visitor/TupleFilterVisitor.java | 24 +- .../core/query/StreamingDataQueryPlanner.java | 2 +- .../stream/core/query/StreamingTupleConverter.java | 29 +- .../core/storage/StreamingSegmentManager.java | 16 +- .../storage/columnar/ColumnarStoreDimDesc.java | 2 +- .../storage/columnar/FragmentFileSearcher.java | 718 ++++++++++----------- .../columnar/TimeDerivedColumnEncoding.java | 2 +- .../core/util/CompareFilterTimeRangeChecker.java | 1 + .../kylin/stream/server/StreamingServer.java | 6 +- .../stream/source/kafka/TimedJsonStreamParser.java | 2 +- 16 files changed, 620 insertions(+), 381 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 00afb58..9d6dc41 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2228,6 +2228,13 @@ public abstract class KylinConfigBase implements Serializable { return Long.parseLong(getOptional("kylin.stream.metrics.interval", "5")); } + /** + * whether realtime query should add timezone offset by kylin's web-timezone, please refer to KYLIN-4010 for detail + */ + public boolean isStreamingAutoJustTimezone() { + return Boolean.parseBoolean(getOptional("kylin.stream.auto.just.by.timezone", "false")); + } + // ============================================================================ // Health Check CLI // ============================================================================ diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/util/TimeDerivedColumnType.java b/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java similarity index 99% rename from stream-core/src/main/java/org/apache/kylin/stream/core/util/TimeDerivedColumnType.java rename to core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java index 902caf4..01953e7 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/util/TimeDerivedColumnType.java +++ b/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.kylin.stream.core.util; +package org.apache.kylin.dimension; import java.util.Locale; import java.util.Map; diff --git a/stream-core/src/test/java/org/apache/kylin/stream/core/util/TimeDerivedColumnTypeTest.java b/core-metadata/src/test/java/org/apache/kylin/dimension/TimeDerivedColumnTypeTest.java similarity index 99% rename from stream-core/src/test/java/org/apache/kylin/stream/core/util/TimeDerivedColumnTypeTest.java rename to core-metadata/src/test/java/org/apache/kylin/dimension/TimeDerivedColumnTypeTest.java index 414de0f..15f57af 100644 --- a/stream-core/src/test/java/org/apache/kylin/stream/core/util/TimeDerivedColumnTypeTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/dimension/TimeDerivedColumnTypeTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.kylin.stream.core.util; +package org.apache.kylin.dimension; import org.apache.kylin.common.util.DateFormat; import org.junit.Test; diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java index 05fc90f..2e04daa 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java @@ -19,10 +19,12 @@ package org.apache.kylin.storage.gtrecord; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.TimeZone; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Array; @@ -32,6 +34,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.dict.lookup.ILookupTable; +import org.apache.kylin.dimension.TimeDerivedColumnType; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.model.FunctionDesc; @@ -65,6 +68,11 @@ public class CubeTupleConverter implements ITupleConverter { public final List<Integer> advMeasureIndexInGTValues; private List<ILookupTable> usedLookupTables; + final Set<Integer> timestampColumn = new HashSet<>(); + boolean autoJustByTimezone; + private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()) + .getRawOffset(); + public final int nSelectedDims; public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, // @@ -84,7 +92,12 @@ public class CubeTupleConverter implements ITupleConverter { advMeasureFillers = Lists.newArrayListWithCapacity(1); advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1); usedLookupTables = Lists.newArrayList(); - + autoJustByTimezone = cubeSeg.getConfig().isStreamingAutoJustTimezone(); + autoJustByTimezone = autoJustByTimezone + && cubeSeg.getCubeDesc().getModel().getRootFactTable().getTableDesc().isStreamingTable(); + if (autoJustByTimezone) { + logger.debug("Will ajust dimsension for Time Derived Column."); + } //////////// int i = 0; @@ -92,6 +105,8 @@ public class CubeTupleConverter implements ITupleConverter { // pre-calculate dimension index mapping to tuple for (TblColRef dim : selectedDimensions) { tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; + if (dim.getType().isDateTimeFamily() && TimeDerivedColumnType.isTimeDerivedColumn(dim.getName())) + timestampColumn.add(tupleIdx[i]); i++; } @@ -147,7 +162,20 @@ public class CubeTupleConverter implements ITupleConverter { for (int i = 0; i < nSelectedDims; i++) { int ti = tupleIdx[i]; if (ti >= 0) { - tuple.setDimensionValue(ti, toString(gtValues[i])); + // add offset to return result according to timezone + if (autoJustByTimezone && timestampColumn.contains(i)) { + try { + String v = toString(gtValues[i]); + if (v != null) { + tuple.setDimensionValue(ti, Long.toString(Long.parseLong(v) + TIME_ZONE_OFFSET)); + } + } catch (NumberFormatException nfe) { + logger.warn("{} is not a long value.", gtValues[i]); + tuple.setDimensionValue(ti, toString(gtValues[i])); + } + } else { + tuple.setDimensionValue(ti, toString(gtValues[i])); + } } } diff --git a/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java b/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java new file mode 100644 index 0000000..cb75b34 --- /dev/null +++ b/query/src/main/java/org/apache/kylin/query/optrule/visitor/TimezoneRewriteVisitor.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.query.optrule.visitor; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexPatternFieldRef; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; + +public class TimezoneRewriteVisitor extends RexVisitorImpl<RexNode> { + public static final Logger logger = LoggerFactory.getLogger(TimezoneRewriteVisitor.class); + + public TimezoneRewriteVisitor(boolean deep) { + super(deep); + } + + @Override + public RexNode visitCall(RexCall call) { + List<RexNode> subList = new ArrayList<>(); + SqlTypeName type = call.getType().getSqlTypeName(); + if (call.getKind() == SqlKind.CAST) { + if (type.getFamily() == SqlTypeFamily.DATE || type.getFamily() == SqlTypeFamily.DATETIME + || type.getFamily() == SqlTypeFamily.TIMESTAMP) { + for (RexNode node : call.getOperands()) { + if (node instanceof RexLiteral) { + RexLiteral literal = (RexLiteral) node; + String toBeModify = literal.getValue2().toString(); + logger.info(toBeModify); + if (toBeModify.length() != 19) { + subList.add(node); + continue; + } + // minus offset by timezone in RelNode level + // this will affect code gen + int minusHours = 0; + String afetrModify = LocalDateTime + .parse(toBeModify, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).minusHours(minusHours) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + logger.info("{} -> {}", toBeModify, afetrModify); + RexLiteral newliteral = RexLiteral.fromJdbcString(literal.getType(), literal.getTypeName(), + afetrModify); + subList.add(newliteral); + } else { + subList.add(node); + } + } + } + return call.clone(call.type, subList); + } + + for (RexNode operand : call.operands) { + RexNode node = operand.accept(this); + subList.add(node); + } + return call.clone(call.type, subList); + } + + @Override + public RexNode visitLiteral(RexLiteral literal) { + return literal; + } + + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + return inputRef; + } + + @Override + public RexNode visitLocalRef(RexLocalRef localRef) { + return localRef; + } + + @Override + public RexNode visitOver(RexOver over) { + return over; + } + + @Override + public RexNode visitCorrelVariable(RexCorrelVariable correlVariable) { + return correlVariable; + } + + @Override + public RexNode visitDynamicParam(RexDynamicParam dynamicParam) { + return dynamicParam; + } + + @Override + public RexNode visitRangeRef(RexRangeRef rangeRef) { + return rangeRef; + } + + @Override + public RexNode visitFieldAccess(RexFieldAccess fieldAccess) { + return fieldAccess; + } + + @Override + public RexNode visitSubQuery(RexSubQuery subQuery) { + return subQuery; + } + + @Override + public RexNode visitTableInputRef(RexTableInputRef ref) { + return ref; + } + + @Override + public RexNode visitPatternFieldRef(RexPatternFieldRef fieldRef) { + return fieldRef; + } + + public static void main(String[] args) { + + } +} diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java index b34b42e..3a76be6 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java @@ -38,6 +38,7 @@ import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metadata.filter.FilterOptimizeTransformer; import org.apache.kylin.metadata.filter.LogicalTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; @@ -54,6 +55,7 @@ public class OLAPFilterRel extends Filter implements OLAPRel { ColumnRowType columnRowType; OLAPContext context; + boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().isStreamingAutoJustTimezone(); public OLAPFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) { super(cluster, traits, child, condition); @@ -105,6 +107,9 @@ public class OLAPFilterRel extends Filter implements OLAPRel { } TupleFilterVisitor visitor = new TupleFilterVisitor(this.columnRowType); + boolean isRealtimeTable = columnRowType.getColumnByIndex(0).getColumnDesc().getTable().isStreamingTable() ; + autoJustTimezone = isRealtimeTable && autoJustTimezone; + visitor.setAutoJustByTimezone(autoJustTimezone); TupleFilter filter = this.condition.accept(visitor); // optimize the filter, the optimization has to be segment-irrelevant diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java index 7634023..3e07eca 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java @@ -37,6 +37,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.util.NlsString; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.Pair; import org.apache.kylin.metadata.filter.CaseTupleFilter; @@ -57,11 +58,17 @@ import java.util.GregorianCalendar; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TimeZone; public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> { final ColumnRowType inputRowType; + // is the fact table is a streamingv2 table + private boolean autoJustByTimezone = false; + private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()) + .getRawOffset(); + public TupleFilterVisitor(ColumnRowType inputRowType) { super(true); this.inputRowType = inputRowType; @@ -210,10 +217,17 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> { || type.getFamily() == SqlTypeFamily.TIMESTAMP) { List<String> newValues = Lists.newArrayList(); for (Object v : constFilter.getValues()) { - if (v == null) + if (v == null) { newValues.add(null); - else - newValues.add(String.valueOf(DateFormat.stringToMillis(v.toString()))); + } else { + long ts = DateFormat.stringToMillis(v.toString()); + // minus offset by timezone in RelNode level + // this will affect request sent to storage level + if (autoJustByTimezone) { + ts -= TIME_ZONE_OFFSET; + } + newValues.add(String.valueOf(ts)); + } } constFilter = new ConstantTupleFilter(newValues); } @@ -314,4 +328,8 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> { TupleFilter filter = new DynamicTupleFilter(name); return filter; } + + public void setAutoJustByTimezone(boolean autoJustByTimezone) { + this.autoJustByTimezone = autoJustByTimezone; + } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java index 2b1c286..98b4f4b 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java @@ -27,7 +27,7 @@ import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker; import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker.CheckResult; -import org.apache.kylin.stream.core.util.TimeDerivedColumnType; +import org.apache.kylin.dimension.TimeDerivedColumnType; /** * Scan planner for Streaming data segments, take derived time columns into consideration. diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java index 4788b21..110cc8e 100755 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java @@ -18,8 +18,12 @@ package org.apache.kylin.stream.core.query; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.TimeZone; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.model.FunctionDesc; @@ -27,14 +31,19 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.Tuple; import org.apache.kylin.metadata.tuple.TupleInfo; import org.apache.kylin.stream.core.storage.Record; +import org.apache.kylin.dimension.TimeDerivedColumnType; import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Convert Streaming Record to Tuple - * */ public class StreamingTupleConverter { + + private static final Logger logger = LoggerFactory.getLogger(StreamingTupleConverter.class); + final TupleInfo tupleInfo; final int[] dimTupleIdx; @@ -42,10 +51,13 @@ public class StreamingTupleConverter { final int dimCnt; final int metricsCnt; final MeasureType<?>[] measureTypes; + final Set<Integer> timestampColumn = new HashSet<>(); final List<MeasureType.IAdvMeasureFiller> advMeasureFillers; final List<Integer> advMeasureIndexInGTValues; - + final boolean autoTimezone = KylinConfig.getInstanceFromEnv().isStreamingAutoJustTimezone(); + private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()) + .getRawOffset(); public StreamingTupleConverter(ResponseResultSchema schema, TupleInfo returnTupleInfo) { this.tupleInfo = returnTupleInfo; @@ -64,6 +76,8 @@ public class StreamingTupleConverter { // pre-calculate dimension index mapping to tuple for (TblColRef dim : schema.getDimensions()) { dimTupleIdx[idx] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1; + if (dim.getType().isDateTimeFamily() && TimeDerivedColumnType.isTimeDerivedColumn(dim.getName())) + timestampColumn.add(dimTupleIdx[idx]); idx++; } @@ -95,7 +109,16 @@ public class StreamingTupleConverter { for (int i = 0; i < dimCnt; i++) { int ti = dimTupleIdx[i]; if (ti >= 0) { - tuple.setDimensionValue(ti, dimValues[i]); + if (autoTimezone && timestampColumn.contains(ti)) { + try { + tuple.setDimensionValue(ti, Long.toString(Long.parseLong(dimValues[i]) + TIME_ZONE_OFFSET)); + } catch (NumberFormatException nfe) { + logger.warn("{} is not a long value.", dimValues[i]); + tuple.setDimensionValue(ti, dimValues[i]); + } + } else { + tuple.setDimensionValue(ti, dimValues[i]); + } } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java index 9276d27..fa9b9ab 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java @@ -29,11 +29,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TimeZone; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; - import org.apache.commons.io.FileUtils; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; @@ -93,8 +93,8 @@ public class StreamingSegmentManager implements Closeable { private final ISourcePositionHandler sourcePositionHandler; private final ISourcePosition consumePosition; - - //TODO long latency event may not be dropped directly + private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()) + .getRawOffset(); private volatile LongLatencyInfo longLatencyInfo; private volatile long nextCheckPoint = 0; private volatile long lastCheckPointCount = 0; @@ -543,9 +543,9 @@ public class StreamingSegmentManager implements Closeable { for (StreamingCubeSegment segment : allSegments) { SegmentStats segmentStats = new SegmentStats(); segmentStats.setSegmentState(segment.getState().name()); - segmentStats.setSegmentCreateTime(segment.getCreateTime()); - segmentStats.setSegmentLastUpdateTime(segment.getLastUpdateTime()); - segmentStats.setLatestEventTime(segment.getLatestEventTimeStamp()); + segmentStats.setSegmentCreateTime(resetTimestampByTimeZone(segment.getCreateTime())); + segmentStats.setSegmentLastUpdateTime(resetTimestampByTimeZone(segment.getLastUpdateTime())); + segmentStats.setLatestEventTime(resetTimestampByTimeZone(segment.getLatestEventTimeStamp())); segmentStats.setLatestEventLatency(segment.getLatestEventLatecy()); SegmentStoreStats storeStats = segment.getSegmentStore().getStoreStats(); segmentStats.setStoreStats(storeStats); @@ -623,4 +623,8 @@ public class StreamingSegmentManager implements Closeable { private long truncateTime(long timestamp, long windowLength) { return (timestamp / windowLength) * windowLength; } + + public static long resetTimestampByTimeZone(long ts) { + return ts + TIME_ZONE_OFFSET; + } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java index b3d1f80..0406ecb 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarStoreDimDesc.java @@ -35,7 +35,7 @@ import org.apache.kylin.stream.core.storage.columnar.compress.NoCompressedColumn import org.apache.kylin.stream.core.storage.columnar.compress.NoCompressedColumnWriter; import org.apache.kylin.stream.core.storage.columnar.compress.RunLengthCompressedColumnReader; import org.apache.kylin.stream.core.storage.columnar.compress.RunLengthCompressedColumnWriter; -import org.apache.kylin.stream.core.util.TimeDerivedColumnType; +import org.apache.kylin.dimension.TimeDerivedColumnType; public class ColumnarStoreDimDesc { private int fixLen; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java index 20b123a..9b0a912 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java @@ -1,360 +1,360 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.stream.core.storage.columnar; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; - -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.cube.kv.CubeDimEncMap; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.BuiltInFunctionTransformer; -import org.apache.kylin.dimension.DimensionEncoding; -import org.apache.kylin.dimension.IDimensionEncodingMap; -import org.apache.kylin.metadata.filter.ColumnTupleFilter; -import org.apache.kylin.metadata.filter.CompareTupleFilter; -import org.apache.kylin.metadata.filter.ConstantTupleFilter; -import org.apache.kylin.metadata.filter.FilterOptimizeTransformer; -import org.apache.kylin.metadata.filter.ITupleFilterTransformer; -import org.apache.kylin.metadata.filter.LogicalTupleFilter; -import org.apache.kylin.metadata.filter.StringCodeSystem; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; -import org.apache.kylin.metadata.filter.TupleFilterSerializer; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.stream.core.query.IStreamingGTSearcher; -import org.apache.kylin.stream.core.query.IStreamingSearchResult; -import org.apache.kylin.stream.core.query.ResponseResultSchema; -import org.apache.kylin.stream.core.query.ResultCollector; -import org.apache.kylin.stream.core.query.StreamingSearchContext; -import org.apache.kylin.stream.core.storage.columnar.protocol.CuboidMetaInfo; -import org.apache.kylin.stream.core.storage.columnar.protocol.FragmentMetaInfo; -import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker; -import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker.CheckResult; -import org.apache.kylin.stream.core.util.TimeDerivedColumnType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * FragmentFileSearcher is responsible to scan the columnar based storage on disk and deal with all the bytes level details for each DataFragment and return the result as GTRecords. - * - */ -public class FragmentFileSearcher implements IStreamingGTSearcher { - private static Logger logger = LoggerFactory.getLogger(FragmentFileSearcher.class); - - private FragmentData fragmentData; - private DataSegmentFragment fragment; - - public FragmentFileSearcher(DataSegmentFragment fragment, FragmentData fragmentData) { - this.fragment = fragment; - this.fragmentData = fragmentData; - } - - @Override - public void search(StreamingSearchContext searchContext, ResultCollector collector) throws IOException { - FragmentMetaInfo fragmentMetaInfo = fragmentData.getFragmentMetaInfo(); - CuboidMetaInfo cuboidMetaInfo; - if (searchContext.hitBasicCuboid()) { - cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo(); - } else { - cuboidMetaInfo = fragmentMetaInfo.getCuboidMetaInfo(searchContext.getHitCuboid()); - if (cuboidMetaInfo == null) { - logger.warn("the cuboid:{} is not exist in the fragment:{}, use basic cuboid instead", - searchContext.getHitCuboid(), fragment.getFragmentId()); - cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo(); - } - } - - ResponseResultSchema responseSchema = searchContext.getRespResultSchema(); - TblColRef[] dimensions = responseSchema.getDimensions(); - FunctionDesc[] metrics = responseSchema.getMetrics(); - Map<TblColRef, Dictionary<String>> dictMap = fragmentData.getDimensionDictionaries(dimensions); - - CubeDesc cubeDesc = responseSchema.getCubeDesc(); - List<MeasureDesc> allMeasures = cubeDesc.getMeasures(); - Map<FunctionDesc, MeasureDesc> funcMeasureMap = Maps.newHashMap(); - for (MeasureDesc measure : allMeasures) { - funcMeasureMap.put(measure.getFunction(), measure); - } - MeasureDesc[] measures = new MeasureDesc[metrics.length]; - for (int i = 0; i < measures.length; i++) { - measures[i] = funcMeasureMap.get(metrics[i]); - } - DimensionEncoding[] dimensionEncodings = ParsedStreamingCubeInfo.getDimensionEncodings(cubeDesc, dimensions, - dictMap); - ColumnarMetricsEncoding[] metricsEncodings = ParsedStreamingCubeInfo.getMetricsEncodings(measures); - ColumnarRecordCodec recordCodec = new ColumnarRecordCodec(dimensionEncodings, metricsEncodings); - - // change the unEvaluable dimensions to groupBy - Set<TblColRef> unEvaluateDims = Sets.newHashSet(); - TupleFilter fragmentFilter = null; - if (searchContext.getFilter() != null) { - fragmentFilter = convertFilter(fragmentMetaInfo, searchContext.getFilter(), recordCodec, - dimensions, new CubeDimEncMap(cubeDesc, dictMap), unEvaluateDims); - } - if (ConstantTupleFilter.TRUE == fragmentFilter) { - fragmentFilter = null; - } else if (ConstantTupleFilter.FALSE == fragmentFilter) { - collector.collectSearchResult(IStreamingSearchResult.EMPTY_RESULT); - } - Set<TblColRef> groups = searchContext.getGroups(); - if (!unEvaluateDims.isEmpty()) { - searchContext.addNewGroups(unEvaluateDims); - groups = Sets.union(groups, unEvaluateDims); - } - collector.collectSearchResult(new FragmentSearchResult(fragment, fragmentData, cuboidMetaInfo, responseSchema, fragmentFilter, groups, searchContext.getHavingFilter(), - recordCodec)); - } - - private TupleFilter convertFilter(FragmentMetaInfo fragmentMetaInfo, TupleFilter rootFilter, - ColumnarRecordCodec recordCodec, final TblColRef[] dimensions, final IDimensionEncodingMap dimEncodingMap, // - final Set<TblColRef> unEvaluableColumnCollector) { - Map<TblColRef, Integer> colMapping = Maps.newHashMap(); - for (int i = 0; i < dimensions.length; i++) { - colMapping.put(dimensions[i], i); - } - byte[] bytes = TupleFilterSerializer.serialize(rootFilter, null, StringCodeSystem.INSTANCE); - TupleFilter filter = TupleFilterSerializer.deserialize(bytes, StringCodeSystem.INSTANCE); - - BuiltInFunctionTransformer builtInFunctionTransformer = new BuiltInFunctionTransformer(dimEncodingMap); - filter = builtInFunctionTransformer.transform(filter); - FragmentFilterConverter fragmentFilterConverter = new FragmentFilterConverter(fragmentMetaInfo, unEvaluableColumnCollector, - colMapping, recordCodec); - filter = fragmentFilterConverter.transform(filter); - - filter = new FilterOptimizeTransformer().transform(filter); - return filter; - } - - protected static class FragmentFilterConverter implements ITupleFilterTransformer { - protected final Set<TblColRef> unEvaluableColumnCollector; - protected final Map<TblColRef, Integer> colMapping; - private CompareFilterTimeRangeChecker filterTimeRangeChecker; - private ColumnarRecordCodec recordCodec; - transient ByteBuffer buf; - - public FragmentFilterConverter(FragmentMetaInfo fragmentMetaInfo, Set<TblColRef> unEvaluableColumnCollector, - Map<TblColRef, Integer> colMapping, ColumnarRecordCodec recordCodec) { - this.unEvaluableColumnCollector = unEvaluableColumnCollector; - this.recordCodec = recordCodec; - this.colMapping = colMapping; - if (fragmentMetaInfo.hasValidEventTimeRange()) { - this.filterTimeRangeChecker = new CompareFilterTimeRangeChecker(fragmentMetaInfo.getMinEventTime(), - fragmentMetaInfo.getMaxEventTime(), true); - } - buf = ByteBuffer.allocate(recordCodec.getMaxDimLength()); - } - - protected int mapCol(TblColRef col) { - Integer i = colMapping.get(col); - return i == null ? -1 : i; - } - - @Override - public TupleFilter transform(TupleFilter filter) { - if (filter.getOperator() == TupleFilter.FilterOperatorEnum.NOT - && !TupleFilter.isEvaluableRecursively(filter)) { - TupleFilter.collectColumns(filter, unEvaluableColumnCollector); - return ConstantTupleFilter.TRUE; - } - - // shortcut for unEvaluatable filter - if (!filter.isEvaluable()) { - TupleFilter.collectColumns(filter, unEvaluableColumnCollector); - return ConstantTupleFilter.TRUE; - } - - if (filter instanceof CompareTupleFilter) { - return translateCompareFilter((CompareTupleFilter) filter); - } else if (filter instanceof LogicalTupleFilter) { - @SuppressWarnings("unchecked") - ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) filter.getChildren().listIterator(); - while (childIterator.hasNext()) { - TupleFilter transformed = transform(childIterator.next()); - if (transformed != null) { - childIterator.set(transformed); - } else { - throw new IllegalStateException("Should not be null"); - } - } - } - return filter; - } - - - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected TupleFilter translateCompareFilter(CompareTupleFilter oldCompareFilter) { - // extract ColumnFilter & ConstantFilter - TblColRef externalCol = oldCompareFilter.getColumn(); - - if (externalCol == null) { - return oldCompareFilter; - } - - Collection constValues = oldCompareFilter.getValues(); - if (constValues == null || constValues.isEmpty()) { - return oldCompareFilter; - } - - if (TimeDerivedColumnType.isTimeDerivedColumn(externalCol.getName()) && filterTimeRangeChecker != null) { - CheckResult checkResult = filterTimeRangeChecker.check(oldCompareFilter, - TimeDerivedColumnType.getTimeDerivedColumnType(externalCol.getName())); - if (checkResult == CheckResult.INCLUDED) { - return ConstantTupleFilter.TRUE; - } else if (checkResult == CheckResult.EXCLUDED) { - return ConstantTupleFilter.FALSE; - } - } - - //CompareTupleFilter containing BuiltInFunctionTupleFilter will not reach here caz it will be transformed by BuiltInFunctionTransformer - CompareTupleFilter newCompareFilter = new CompareTupleFilter(oldCompareFilter.getOperator()); - newCompareFilter.addChild(new ColumnTupleFilter(externalCol)); - - //for CompareTupleFilter containing dynamicVariables, the below codes will actually replace dynamicVariables - //with normal ConstantTupleFilter - - Object firstValue = constValues.iterator().next(); - int col = mapCol(externalCol); - - TupleFilter result; - ByteArray code; - - // translate constant into code - switch (newCompareFilter.getOperator()) { - case EQ: - case IN: - Set newValues = Sets.newHashSet(); - for (Object value : constValues) { - code = translate(col, value, 0); - if (code != null) - newValues.add(code); - } - if (newValues.isEmpty()) { - result = ConstantTupleFilter.FALSE; - } else { - newCompareFilter.addChild(new ConstantTupleFilter(newValues)); - result = newCompareFilter; - } - break; - case NOTIN: - Set notInValues = Sets.newHashSet(); - for (Object value : constValues) { - code = translate(col, value, 0); - if (code != null) - notInValues.add(code); - } - if (notInValues.isEmpty()) { - result = ConstantTupleFilter.TRUE; - } else { - newCompareFilter.addChild(new ConstantTupleFilter(notInValues)); - result = newCompareFilter; - } - break; - case NEQ: - code = translate(col, firstValue, 0); - if (code == null) { - result = ConstantTupleFilter.TRUE; - } else { - newCompareFilter.addChild(new ConstantTupleFilter(code)); - result = newCompareFilter; - } - break; - case LT: - code = translate(col, firstValue, 0); - if (code == null) { - code = translate(col, firstValue, -1); - if (code == null) - result = ConstantTupleFilter.FALSE; - else - result = newCompareFilter(FilterOperatorEnum.LTE, externalCol, code); - } else { - newCompareFilter.addChild(new ConstantTupleFilter(code)); - result = newCompareFilter; - } - break; - case LTE: - code = translate(col, firstValue, -1); - if (code == null) { - result = ConstantTupleFilter.FALSE; - } else { - newCompareFilter.addChild(new ConstantTupleFilter(code)); - result = newCompareFilter; - } - break; - case GT: - code = translate(col, firstValue, 0); - if (code == null) { - code = translate(col, firstValue, 1); - if (code == null) - result = ConstantTupleFilter.FALSE; - else - result = newCompareFilter(FilterOperatorEnum.GTE, externalCol, code); - } else { - newCompareFilter.addChild(new ConstantTupleFilter(code)); - result = newCompareFilter; - } - break; - case GTE: - code = translate(col, firstValue, 1); - if (code == null) { - result = ConstantTupleFilter.FALSE; - } else { - newCompareFilter.addChild(new ConstantTupleFilter(code)); - result = newCompareFilter; - } - break; - default: - throw new IllegalStateException("Cannot handle operator " + newCompareFilter.getOperator()); - } - return result; - } - - private TupleFilter newCompareFilter(FilterOperatorEnum op, TblColRef col, ByteArray code) { - CompareTupleFilter r = new CompareTupleFilter(op); - r.addChild(new ColumnTupleFilter(col)); - r.addChild(new ConstantTupleFilter(code)); - return r; - } - - protected ByteArray translate(int col, Object value, int roundingFlag) { - try { - buf.clear(); - recordCodec.encodeDimension(col, value, roundingFlag, buf); - int length = buf.position(); - return ByteArray.copyOf(buf.array(), 0, length); - } catch (IllegalArgumentException ex) { - return null; - } - } - - - } - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.stream.core.storage.columnar; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.kv.CubeDimEncMap; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.dict.BuiltInFunctionTransformer; +import org.apache.kylin.dimension.DimensionEncoding; +import org.apache.kylin.dimension.IDimensionEncodingMap; +import org.apache.kylin.metadata.filter.ColumnTupleFilter; +import org.apache.kylin.metadata.filter.CompareTupleFilter; +import org.apache.kylin.metadata.filter.ConstantTupleFilter; +import org.apache.kylin.metadata.filter.FilterOptimizeTransformer; +import org.apache.kylin.metadata.filter.ITupleFilterTransformer; +import org.apache.kylin.metadata.filter.LogicalTupleFilter; +import org.apache.kylin.metadata.filter.StringCodeSystem; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; +import org.apache.kylin.metadata.filter.TupleFilterSerializer; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.stream.core.query.IStreamingGTSearcher; +import org.apache.kylin.stream.core.query.IStreamingSearchResult; +import org.apache.kylin.stream.core.query.ResponseResultSchema; +import org.apache.kylin.stream.core.query.ResultCollector; +import org.apache.kylin.stream.core.query.StreamingSearchContext; +import org.apache.kylin.stream.core.storage.columnar.protocol.CuboidMetaInfo; +import org.apache.kylin.stream.core.storage.columnar.protocol.FragmentMetaInfo; +import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker; +import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker.CheckResult; +import org.apache.kylin.dimension.TimeDerivedColumnType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * FragmentFileSearcher is responsible to scan the columnar based storage on disk and deal with all the bytes level details for each DataFragment and return the result as GTRecords. + * + */ +public class FragmentFileSearcher implements IStreamingGTSearcher { + private static Logger logger = LoggerFactory.getLogger(FragmentFileSearcher.class); + + private FragmentData fragmentData; + private DataSegmentFragment fragment; + + public FragmentFileSearcher(DataSegmentFragment fragment, FragmentData fragmentData) { + this.fragment = fragment; + this.fragmentData = fragmentData; + } + + @Override + public void search(StreamingSearchContext searchContext, ResultCollector collector) throws IOException { + FragmentMetaInfo fragmentMetaInfo = fragmentData.getFragmentMetaInfo(); + CuboidMetaInfo cuboidMetaInfo; + if (searchContext.hitBasicCuboid()) { + cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo(); + } else { + cuboidMetaInfo = fragmentMetaInfo.getCuboidMetaInfo(searchContext.getHitCuboid()); + if (cuboidMetaInfo == null) { + logger.warn("the cuboid:{} is not exist in the fragment:{}, use basic cuboid instead", + searchContext.getHitCuboid(), fragment.getFragmentId()); + cuboidMetaInfo = fragmentMetaInfo.getBasicCuboidMetaInfo(); + } + } + + ResponseResultSchema responseSchema = searchContext.getRespResultSchema(); + TblColRef[] dimensions = responseSchema.getDimensions(); + FunctionDesc[] metrics = responseSchema.getMetrics(); + Map<TblColRef, Dictionary<String>> dictMap = fragmentData.getDimensionDictionaries(dimensions); + + CubeDesc cubeDesc = responseSchema.getCubeDesc(); + List<MeasureDesc> allMeasures = cubeDesc.getMeasures(); + Map<FunctionDesc, MeasureDesc> funcMeasureMap = Maps.newHashMap(); + for (MeasureDesc measure : allMeasures) { + funcMeasureMap.put(measure.getFunction(), measure); + } + MeasureDesc[] measures = new MeasureDesc[metrics.length]; + for (int i = 0; i < measures.length; i++) { + measures[i] = funcMeasureMap.get(metrics[i]); + } + DimensionEncoding[] dimensionEncodings = ParsedStreamingCubeInfo.getDimensionEncodings(cubeDesc, dimensions, + dictMap); + ColumnarMetricsEncoding[] metricsEncodings = ParsedStreamingCubeInfo.getMetricsEncodings(measures); + ColumnarRecordCodec recordCodec = new ColumnarRecordCodec(dimensionEncodings, metricsEncodings); + + // change the unEvaluable dimensions to groupBy + Set<TblColRef> unEvaluateDims = Sets.newHashSet(); + TupleFilter fragmentFilter = null; + if (searchContext.getFilter() != null) { + fragmentFilter = convertFilter(fragmentMetaInfo, searchContext.getFilter(), recordCodec, + dimensions, new CubeDimEncMap(cubeDesc, dictMap), unEvaluateDims); + } + if (ConstantTupleFilter.TRUE == fragmentFilter) { + fragmentFilter = null; + } else if (ConstantTupleFilter.FALSE == fragmentFilter) { + collector.collectSearchResult(IStreamingSearchResult.EMPTY_RESULT); + } + Set<TblColRef> groups = searchContext.getGroups(); + if (!unEvaluateDims.isEmpty()) { + searchContext.addNewGroups(unEvaluateDims); + groups = Sets.union(groups, unEvaluateDims); + } + collector.collectSearchResult(new FragmentSearchResult(fragment, fragmentData, cuboidMetaInfo, responseSchema, fragmentFilter, groups, searchContext.getHavingFilter(), + recordCodec)); + } + + private TupleFilter convertFilter(FragmentMetaInfo fragmentMetaInfo, TupleFilter rootFilter, + ColumnarRecordCodec recordCodec, final TblColRef[] dimensions, final IDimensionEncodingMap dimEncodingMap, // + final Set<TblColRef> unEvaluableColumnCollector) { + Map<TblColRef, Integer> colMapping = Maps.newHashMap(); + for (int i = 0; i < dimensions.length; i++) { + colMapping.put(dimensions[i], i); + } + byte[] bytes = TupleFilterSerializer.serialize(rootFilter, null, StringCodeSystem.INSTANCE); + TupleFilter filter = TupleFilterSerializer.deserialize(bytes, StringCodeSystem.INSTANCE); + + BuiltInFunctionTransformer builtInFunctionTransformer = new BuiltInFunctionTransformer(dimEncodingMap); + filter = builtInFunctionTransformer.transform(filter); + FragmentFilterConverter fragmentFilterConverter = new FragmentFilterConverter(fragmentMetaInfo, unEvaluableColumnCollector, + colMapping, recordCodec); + filter = fragmentFilterConverter.transform(filter); + + filter = new FilterOptimizeTransformer().transform(filter); + return filter; + } + + protected static class FragmentFilterConverter implements ITupleFilterTransformer { + protected final Set<TblColRef> unEvaluableColumnCollector; + protected final Map<TblColRef, Integer> colMapping; + private CompareFilterTimeRangeChecker filterTimeRangeChecker; + private ColumnarRecordCodec recordCodec; + transient ByteBuffer buf; + + public FragmentFilterConverter(FragmentMetaInfo fragmentMetaInfo, Set<TblColRef> unEvaluableColumnCollector, + Map<TblColRef, Integer> colMapping, ColumnarRecordCodec recordCodec) { + this.unEvaluableColumnCollector = unEvaluableColumnCollector; + this.recordCodec = recordCodec; + this.colMapping = colMapping; + if (fragmentMetaInfo.hasValidEventTimeRange()) { + this.filterTimeRangeChecker = new CompareFilterTimeRangeChecker(fragmentMetaInfo.getMinEventTime(), + fragmentMetaInfo.getMaxEventTime(), true); + } + buf = ByteBuffer.allocate(recordCodec.getMaxDimLength()); + } + + protected int mapCol(TblColRef col) { + Integer i = colMapping.get(col); + return i == null ? -1 : i; + } + + @Override + public TupleFilter transform(TupleFilter filter) { + if (filter.getOperator() == TupleFilter.FilterOperatorEnum.NOT + && !TupleFilter.isEvaluableRecursively(filter)) { + TupleFilter.collectColumns(filter, unEvaluableColumnCollector); + return ConstantTupleFilter.TRUE; + } + + // shortcut for unEvaluatable filter + if (!filter.isEvaluable()) { + TupleFilter.collectColumns(filter, unEvaluableColumnCollector); + return ConstantTupleFilter.TRUE; + } + + if (filter instanceof CompareTupleFilter) { + return translateCompareFilter((CompareTupleFilter) filter); + } else if (filter instanceof LogicalTupleFilter) { + @SuppressWarnings("unchecked") + ListIterator<TupleFilter> childIterator = (ListIterator<TupleFilter>) filter.getChildren().listIterator(); + while (childIterator.hasNext()) { + TupleFilter transformed = transform(childIterator.next()); + if (transformed != null) { + childIterator.set(transformed); + } else { + throw new IllegalStateException("Should not be null"); + } + } + } + return filter; + } + + + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected TupleFilter translateCompareFilter(CompareTupleFilter oldCompareFilter) { + // extract ColumnFilter & ConstantFilter + TblColRef externalCol = oldCompareFilter.getColumn(); + + if (externalCol == null) { + return oldCompareFilter; + } + + Collection constValues = oldCompareFilter.getValues(); + if (constValues == null || constValues.isEmpty()) { + return oldCompareFilter; + } + + if (TimeDerivedColumnType.isTimeDerivedColumn(externalCol.getName()) && filterTimeRangeChecker != null) { + CheckResult checkResult = filterTimeRangeChecker.check(oldCompareFilter, + TimeDerivedColumnType.getTimeDerivedColumnType(externalCol.getName())); + if (checkResult == CheckResult.INCLUDED) { + return ConstantTupleFilter.TRUE; + } else if (checkResult == CheckResult.EXCLUDED) { + return ConstantTupleFilter.FALSE; + } + } + + //CompareTupleFilter containing BuiltInFunctionTupleFilter will not reach here caz it will be transformed by BuiltInFunctionTransformer + CompareTupleFilter newCompareFilter = new CompareTupleFilter(oldCompareFilter.getOperator()); + newCompareFilter.addChild(new ColumnTupleFilter(externalCol)); + + //for CompareTupleFilter containing dynamicVariables, the below codes will actually replace dynamicVariables + //with normal ConstantTupleFilter + + Object firstValue = constValues.iterator().next(); + int col = mapCol(externalCol); + + TupleFilter result; + ByteArray code; + + // translate constant into code + switch (newCompareFilter.getOperator()) { + case EQ: + case IN: + Set newValues = Sets.newHashSet(); + for (Object value : constValues) { + code = translate(col, value, 0); + if (code != null) + newValues.add(code); + } + if (newValues.isEmpty()) { + result = ConstantTupleFilter.FALSE; + } else { + newCompareFilter.addChild(new ConstantTupleFilter(newValues)); + result = newCompareFilter; + } + break; + case NOTIN: + Set notInValues = Sets.newHashSet(); + for (Object value : constValues) { + code = translate(col, value, 0); + if (code != null) + notInValues.add(code); + } + if (notInValues.isEmpty()) { + result = ConstantTupleFilter.TRUE; + } else { + newCompareFilter.addChild(new ConstantTupleFilter(notInValues)); + result = newCompareFilter; + } + break; + case NEQ: + code = translate(col, firstValue, 0); + if (code == null) { + result = ConstantTupleFilter.TRUE; + } else { + newCompareFilter.addChild(new ConstantTupleFilter(code)); + result = newCompareFilter; + } + break; + case LT: + code = translate(col, firstValue, 0); + if (code == null) { + code = translate(col, firstValue, -1); + if (code == null) + result = ConstantTupleFilter.FALSE; + else + result = newCompareFilter(FilterOperatorEnum.LTE, externalCol, code); + } else { + newCompareFilter.addChild(new ConstantTupleFilter(code)); + result = newCompareFilter; + } + break; + case LTE: + code = translate(col, firstValue, -1); + if (code == null) { + result = ConstantTupleFilter.FALSE; + } else { + newCompareFilter.addChild(new ConstantTupleFilter(code)); + result = newCompareFilter; + } + break; + case GT: + code = translate(col, firstValue, 0); + if (code == null) { + code = translate(col, firstValue, 1); + if (code == null) + result = ConstantTupleFilter.FALSE; + else + result = newCompareFilter(FilterOperatorEnum.GTE, externalCol, code); + } else { + newCompareFilter.addChild(new ConstantTupleFilter(code)); + result = newCompareFilter; + } + break; + case GTE: + code = translate(col, firstValue, 1); + if (code == null) { + result = ConstantTupleFilter.FALSE; + } else { + newCompareFilter.addChild(new ConstantTupleFilter(code)); + result = newCompareFilter; + } + break; + default: + throw new IllegalStateException("Cannot handle operator " + newCompareFilter.getOperator()); + } + return result; + } + + private TupleFilter newCompareFilter(FilterOperatorEnum op, TblColRef col, ByteArray code) { + CompareTupleFilter r = new CompareTupleFilter(op); + r.addChild(new ColumnTupleFilter(col)); + r.addChild(new ConstantTupleFilter(code)); + return r; + } + + protected ByteArray translate(int col, Object value, int roundingFlag) { + try { + buf.clear(); + recordCodec.encodeDimension(col, value, roundingFlag, buf); + int length = buf.position(); + return ByteArray.copyOf(buf.array(), 0, length); + } catch (IllegalArgumentException ex) { + return null; + } + } + + + } + } \ No newline at end of file diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/TimeDerivedColumnEncoding.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/TimeDerivedColumnEncoding.java index c342c04..22025ff 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/TimeDerivedColumnEncoding.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/TimeDerivedColumnEncoding.java @@ -19,7 +19,7 @@ package org.apache.kylin.stream.core.storage.columnar; import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.stream.core.util.TimeDerivedColumnType; +import org.apache.kylin.dimension.TimeDerivedColumnType; public class TimeDerivedColumnEncoding { private String columnName; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java b/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java index 5be5fd1..c01459d 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Set; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.dimension.TimeDerivedColumnType; import org.apache.kylin.metadata.filter.CompareTupleFilter; /** diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java index 2a3b077..408a5e7a 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java @@ -477,8 +477,10 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis Map<String, SegmentStats> segmentStatsMap = segmentManager.getSegmentStats(); receiverCubeStats.setSegmentStatsMap(segmentStatsMap); receiverCubeStats.setTotalIngest(segmentManager.getIngestCount()); - receiverCubeStats.setLatestEventTime(segmentManager.getLatestEventTime()); - receiverCubeStats.setLatestEventIngestTime(segmentManager.getLatestEventIngestTime()); + receiverCubeStats.setLatestEventTime( + StreamingSegmentManager.resetTimestampByTimeZone(segmentManager.getLatestEventTime())); + receiverCubeStats.setLatestEventIngestTime( + StreamingSegmentManager.resetTimestampByTimeZone(segmentManager.getLatestEventIngestTime())); receiverCubeStats.setLongLatencyInfo(segmentManager.getLongLatencyInfo()); } return receiverCubeStats; diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java index 32e4111..809844c 100644 --- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java +++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java @@ -40,7 +40,7 @@ import org.apache.kylin.stream.core.exception.StreamingException; import org.apache.kylin.stream.core.model.StreamingMessage; import org.apache.kylin.stream.core.source.IStreamingMessageParser; import org.apache.kylin.stream.core.source.MessageParserInfo; -import org.apache.kylin.stream.core.util.TimeDerivedColumnType; +import org.apache.kylin.dimension.TimeDerivedColumnType; import org.apache.kylin.stream.source.kafka.KafkaPosition.KafkaPartitionPosition; import org.slf4j.Logger; import org.slf4j.LoggerFactory;