Repository: carbondata Updated Branches: refs/heads/master 088465f0c -> 7269c0627
[CARBONDATA-2200] Fix bug of LIKE operation on streaming table Fix bug of LIKE operation on streaming table, LIKE operation will be converted to StartsWith / EndsWith / Contains expression. Carbon will use RowLevelFilterExecuterImpl to evaluate this expression. Streaming table also should implement RowLevelFilterExecuterImpl. This closes #1996 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7269c062 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7269c062 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7269c062 Branch: refs/heads/master Commit: 7269c0627656e2752bd35c0830cfb134da0aa848 Parents: 088465f Author: QiangCai <qiang...@qq.com> Authored: Sun Feb 25 18:53:41 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sun Feb 25 23:10:58 2018 +0800 ---------------------------------------------------------------------- .../executer/RowLevelFilterExecuterImpl.java | 82 +++++++++++++++++++- .../TestStreamingTableOperation.scala | 20 ++++- 2 files changed, 100 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7269c062/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java index 8956f30..de97e82 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java @@ -60,6 +60,7 @@ import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnRes import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo; import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; import org.apache.carbondata.core.util.BitSetGroup; +import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; @@ -276,13 +277,92 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter { public boolean applyFilter(RowIntf value, int dimOrdinalMax) throws FilterUnsupportedException, IOException { try { - return exp.evaluate(value).getBoolean(); + Boolean result = exp.evaluate(convertRow(value, dimOrdinalMax)).getBoolean(); + return result == null ? false : result; } catch (FilterIllegalMemberException e) { throw new FilterUnsupportedException(e); } } /** + * convert encoded row to actual value row for filter to evaluate expression + * @param value this row will be converted to actual value + * @param dimOrdinalMax for measure column, its index in row = dimOrdinalMax + its ordinal + * @return actual value row + * @throws IOException + */ + private RowIntf convertRow(RowIntf value, int dimOrdinalMax) throws IOException { + Object[] record = new Object[value.size()]; + String memberString; + for (int i = 0; i < dimColEvaluatorInfoList.size(); i++) { + DimColumnResolvedFilterInfo dimColumnEvaluatorInfo = dimColEvaluatorInfoList.get(i); + int index = dimColumnEvaluatorInfo.getDimension().getOrdinal(); + // if filter dimension is not present in the current add its default value + if (!isDimensionPresentInCurrentBlock[i]) { + // fill default value here + record[index] = getDimensionDefaultValue(dimColumnEvaluatorInfo); + // already set value, so continue to set next dimension + continue; + } + if (!dimColumnEvaluatorInfo.getDimension().getDataType().isComplexType()) { + if (!dimColumnEvaluatorInfo.isDimensionExistsInCurrentSilce()) { + record[index] = dimColumnEvaluatorInfo.getDimension().getDefaultValue(); + } + byte[] memberBytes = (byte[]) value.getVal(index); + if (!dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY)) { + // no dictionary + if (null != memberBytes) { + if (Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, memberBytes)) { + memberBytes = null; + } else if (memberBytes.length == 0) { + memberBytes = null; + } + record[index] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(memberBytes, + dimColumnEvaluatorInfo.getDimension().getDataType()); + } + } else { + // dictionary + int dictionaryValue = ByteUtil.toInt(memberBytes, 0); + if (dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DICTIONARY) + && !dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + memberString = + getFilterActualValueFromDictionaryValue(dimColumnEvaluatorInfo, dictionaryValue); + record[index] = DataTypeUtil.getDataBasedOnDataType(memberString, + dimColumnEvaluatorInfo.getDimension().getDataType()); + } else if ( + dimColumnEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + Object member = getFilterActualValueFromDirectDictionaryValue(dimColumnEvaluatorInfo, + dictionaryValue); + record[index] = member; + } + } + } else { + // complex + record[index] = value.getVal(index); + } + } + + for (int i = 0; i < msrColEvalutorInfoList.size(); i++) { + MeasureColumnResolvedFilterInfo msrColumnEvalutorInfo = msrColEvalutorInfoList.get(i); + int index = msrColumnEvalutorInfo.getMeasure().getOrdinal() + dimOrdinalMax; + // add default value for the measure in case filter measure is not present + // in the current block measure list + if (!isMeasurePresentInCurrentBlock[i]) { + byte[] defaultValue = msrColumnEvalutorInfo.getCarbonColumn().getDefaultValue(); + record[index] = RestructureUtil.getMeasureDefaultValue( + msrColumnEvalutorInfo.getCarbonColumn().getColumnSchema(), defaultValue); + // already set value, so continue to set next measure + continue; + } + // measure + record[index] = value.getVal(index); + } + RowIntf row = new RowImpl(); + row.setValues(record); + return row; + } + + /** * Method will read the members of particular dimension block and create * a row instance for further processing of the filters * http://git-wip-us.apache.org/repos/asf/carbondata/blob/7269c062/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 94baf86..5644302 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -332,6 +332,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("select * from stream_table_filter where name like '%me_3%' and id < 30"), Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + checkAnswer(sql("select count(*) from stream_table_filter where name like '%ame%'"), + Seq(Row(49))) + + checkAnswer(sql("select count(*) from stream_table_filter where name like '%batch%'"), + Seq(Row(5))) + checkAnswer( sql("select * from stream_table_filter where name >= 'name_3' and id < 4"), Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) @@ -350,6 +356,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + checkAnswer(sql("select count(*) from stream_table_filter where city like '%city%'"), + Seq(Row(54))) + checkAnswer( sql("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"), Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), @@ -649,6 +658,12 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("select * from stream_table_filter_complex where name like '%me_3%' and id < 30"), Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) + checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%ame%'"), + Seq(Row(49))) + + checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%batch%'"), + Seq(Row(5))) + checkAnswer( sql("select * from stream_table_filter_complex where name >= 'name_3' and id < 4"), Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) @@ -663,6 +678,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + checkAnswer(sql("select count(*) from stream_table_filter_complex where city like '%city%'"), + Seq(Row(54))) + checkAnswer( sql("select * from stream_table_filter_complex where city > 'city_09' and city < 'city_10'"), Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), @@ -1056,7 +1074,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { //Verify MergeTO column entry for compacted Segments newSegments.filter(_.getString(1).equals("Compacted")).foreach{ rw => assertResult("Compacted")(rw.getString(1)) - assertResult((Integer.parseInt(rw.getString(0))+2).toString)(rw.getString(4)) + assert(Integer.parseInt(rw.getString(0)) < Integer.parseInt(rw.getString(4))) } checkAnswer( sql("select count(*) from streaming.stream_table_reopen"),