This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new ff19521816 PHOENIX-7705 Support for a row size function (#2292)
ff19521816 is described below
commit ff195218168cc4436bad63a941bff7da4dad98f7
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Mon Oct 6 14:02:41 2025 -0700
PHOENIX-7705 Support for a row size function (#2292)
---
.../org/apache/phoenix/compile/QueryCompiler.java | 11 ++
.../apache/phoenix/compile/StatementContext.java | 20 +++
.../org/apache/phoenix/compile/WhereCompiler.java | 12 +-
.../org/apache/phoenix/expression/Expression.java | 12 ++
.../apache/phoenix/expression/ExpressionType.java | 4 +-
.../expression/function/RawRowSizeFunction.java | 40 ++++++
.../expression/function/RowSizeFunction.java | 82 +++++++++++
.../org/apache/phoenix/filter/RowLevelFilter.java | 93 ++++++++++++
.../phoenix/iterate/BaseResultIterators.java | 16 ++-
.../apache/phoenix/parse/RawRowSizeParseNode.java | 43 ++++++
.../org/apache/phoenix/parse/RowSizeParseNode.java | 48 +++++++
.../apache/phoenix/util/EncodedColumnsUtil.java | 8 +-
.../apache/phoenix/end2end/BaseAggregateIT.java | 157 ++++++++++++++++++++-
.../org/apache/phoenix/end2end/CDCQueryIT.java | 28 ++--
14 files changed, 556 insertions(+), 18 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 6a7d9ca9de..a02d544f53 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -70,6 +70,7 @@ import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.OrderByNode;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.RowSizeParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.SubqueryParseNode;
@@ -752,6 +753,16 @@ public class QueryCompiler {
protected QueryPlan compileSingleFlatQuery(StatementContext context,
SelectStatement select,
boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, boolean
inJoin,
boolean inUnion) throws SQLException {
+ for (AliasedNode node : select.getSelect()) {
+ if (node.getNode() instanceof RowSizeParseNode) {
+ throw new SQLException(
+ "ROW_SIZE() can only be an argument to an aggregation function in a
select clause. \n"
+ + "To get the size of a single row, an aggregation function can be
used over the row, e.g., \n"
+ + "SELECT SUM(ROW_SIZE()) ... WHERE PK = <my PK>. To return the
row sizes for multiple rows, \n"
+ + "a group by clause can be used to have single row groups, e.g.,
\n"
+ + "SELECT SUM(ROW_SIZE()) ... WHERE PK = <my PK> GROUP BY PK ");
+ }
+ }
boolean isApplicable = true;
PTable projectedTable = null;
if (this.projectTuples) {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 0c09a4f941..85f1ae3a34 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -89,6 +89,8 @@ public class StatementContext {
private Set<StatementContext> subStatementContexts;
private boolean totalSegmentsFunction = false;
private Integer totalSegmentsValue;
+ private boolean hasRowSizeFunction = false;
+ private boolean hasRawRowSizeFunction = false;
public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
@@ -121,6 +123,8 @@ public class StatementContext {
this.subStatementContexts = Sets.newHashSet();
this.totalSegmentsFunction = context.totalSegmentsFunction;
this.totalSegmentsValue = context.totalSegmentsValue;
+ this.hasRowSizeFunction = context.hasRowSizeFunction;
+ this.hasRawRowSizeFunction = context.hasRawRowSizeFunction;
}
/**
@@ -448,6 +452,22 @@ public class StatementContext {
this.totalSegmentsFunction = totalSegmentsFunction;
}
+ public boolean hasRowSizeFunction() {
+ return hasRowSizeFunction;
+ }
+
+ public boolean hasRawRowSizeFunction() {
+ return hasRawRowSizeFunction;
+ }
+
+ public void setHasRowSizeFunctionFunction(boolean hasRowSizeFunction) {
+ this.hasRowSizeFunction = hasRowSizeFunction;
+ }
+
+ public void setHasRawRowSizeFunctionFunction(boolean hasRawRowSizeFunction) {
+ this.hasRawRowSizeFunction = hasRawRowSizeFunction;
+ }
+
public Integer getTotalSegmentsValue() {
return totalSegmentsValue;
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 9cb55e9aaf..6dee0c2f51 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -87,6 +87,7 @@ import
org.apache.phoenix.filter.MultiCFCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.RowKeyComparisonFilter;
+import org.apache.phoenix.filter.RowLevelFilter;
import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
import org.apache.phoenix.parse.ColumnParseNode;
@@ -540,7 +541,10 @@ public class WhereCompiler {
scan.setAttribute(BaseScannerRegionObserverConstants.INDEX_FILTER_STR,
Bytes.toBytes(whereClause.toString()));
}
- } else if (whereClause != null &&
!ExpressionUtil.evaluatesToTrue(whereClause)) {
+ } else if (
+ whereClause != null && !ExpressionUtil.evaluatesToTrue(whereClause)
+ && !context.hasRowSizeFunction() && !context.hasRawRowSizeFunction()
+ ) {
Filter filter = null;
final Counter counter = new Counter();
whereClause.accept(new KeyValueExpressionVisitor() {
@@ -599,6 +603,12 @@ public class WhereCompiler {
break;
}
scan.setFilter(filter);
+ } else if (whereClause != null &&
!ExpressionUtil.evaluatesToTrue(whereClause)) {
+ if (context.hasRawRowSizeFunction()) {
+ scan.setFilter(new RowLevelFilter(whereClause, true));
+ } else {
+ scan.setFilter(new RowLevelFilter(whereClause, false));
+ }
}
ScanRanges scanRanges = context.getScanRanges();
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/Expression.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/Expression.java
index 8277573c7d..ccc5a0c264 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/Expression.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/Expression.java
@@ -88,4 +88,16 @@ public interface Expression extends PDatum, Writable {
default boolean contains(Expression other) {
return this.equals(other);
}
+
+ /**
+ * Determine if the expression should be evaluated over the entire row
+ */
+ default boolean isRowLevel() {
+ for (Expression child : getChildren()) {
+ if (child.isRowLevel()) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index 50112cc939..2491c71a2d 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -206,7 +206,9 @@ public enum ExpressionType {
SubBinaryFunction(SubBinaryFunction.class),
ScanStartKeyFunction(ScanStartKeyFunction.class),
ScanEndKeyFunction(ScanEndKeyFunction.class),
- TotalSegmentsFunction(TotalSegmentsFunction.class);
+ TotalSegmentsFunction(TotalSegmentsFunction.class),
+ RowSizeFunction(RowSizeFunction.class),
+ RawRowSizeFunction(RawRowSizeFunction.class);
ExpressionType(Class<? extends Expression> clazz) {
this.clazz = clazz;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/RawRowSizeFunction.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/RawRowSizeFunction.java
new file mode 100644
index 0000000000..5e9cf5a2cc
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/RawRowSizeFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.RawRowSizeParseNode;
+
+/**
+ * Function to return the total size of the all HBase cells versions and
delete markers for a given
+ * row
+ */
+@BuiltInFunction(name = RawRowSizeFunction.NAME, nodeClass =
RawRowSizeParseNode.class, args = {})
+public class RawRowSizeFunction extends RowSizeFunction {
+
+ public static final String NAME = "RAW_ROW_SIZE";
+
+ public RawRowSizeFunction() {
+ }
+
+ public RawRowSizeFunction(List<Expression> children) {
+ super(children);
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/RowSizeFunction.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/RowSizeFunction.java
new file mode 100644
index 0000000000..df40e47152
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/RowSizeFunction.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.RowSizeParseNode;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PUnsignedLong;
+
+/**
+ * Function to return the total size of the HBase cells that constitute a
given row
+ */
+@BuiltInFunction(name = RowSizeFunction.NAME, nodeClass =
RowSizeParseNode.class, args = {})
+public class RowSizeFunction extends ScalarFunction {
+
+ public static final String NAME = "ROW_SIZE";
+
+ public RowSizeFunction() {
+ }
+
+ public RowSizeFunction(List<Expression> children) {
+ super(children);
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ if (tuple == null) {
+ return false;
+ }
+ long size = 0;
+ for (int i = 0; i < tuple.size(); i++) {
+ size += tuple.getValue(i).getSerializedSize();
+ }
+ ptr.set(PUnsignedLong.INSTANCE.toBytes(size));
+ return true;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PUnsignedLong.INSTANCE;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public boolean isStateless() {
+ return false;
+ }
+
+ @Override
+ public Determinism getDeterminism() {
+ return Determinism.PER_ROW;
+ }
+
+ @Override
+ public boolean isRowLevel() {
+ return true;
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/RowLevelFilter.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/RowLevelFilter.java
new file mode 100644
index 0000000000..2efb6101cc
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/RowLevelFilter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Filter used when expressions reference to the entire row
+ */
+public class RowLevelFilter extends BooleanExpressionFilter {
+ private boolean allVersions = false;
+ private boolean keepRow = false;
+
+ public RowLevelFilter() {
+ }
+
+ public RowLevelFilter(Expression expression, boolean allVersions) {
+ super(expression);
+ this.allVersions = allVersions;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ keepRow = false;
+ }
+
+ // No @Override for HBase 3 compatibility
+ public ReturnCode filterKeyValue(Cell v) {
+ return filterCell(v);
+ }
+
+ @Override
+ public ReturnCode filterCell(Cell v) {
+ return allVersions ? ReturnCode.INCLUDE : ReturnCode.INCLUDE_AND_NEXT_COL;
+ }
+
+ @Override
+ public void filterRowCells(List<Cell> kvs) throws IOException {
+ Tuple tuple = new MultiKeyValueTuple();
+ tuple.setKeyValues(kvs);
+ keepRow = Boolean.TRUE.equals(evaluate(tuple));
+ }
+
+ @Override
+ public boolean filterRow() {
+ return !this.keepRow;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ super.readFields(input);
+ allVersions = input.readBoolean();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ super.write(output);
+ output.writeBoolean(allVersions);
+ }
+
+ public static RowLevelFilter parseFrom(final byte[] pbBytes) throws
DeserializationException {
+ try {
+ return (RowLevelFilter) Writables.getWritable(pbBytes, new
RowLevelFilter());
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index ac9c5140f7..328b2eb870 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -285,7 +285,7 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
}
// Add FirstKeyOnlyFilter or EmptyColumnOnlyFilter if there are no
references
// to key value columns. We use FirstKeyOnlyFilter when possible
- if (keyOnlyFilter) {
+ if (keyOnlyFilter && !context.hasRowSizeFunction() &&
!context.hasRawRowSizeFunction()) {
byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
byte[] ecq = table.getEncodingScheme() == NON_ENCODED_QUALIFIERS
? QueryConstants.EMPTY_COLUMN_BYTES
@@ -361,12 +361,22 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
if (optimizeProjection) {
optimizeProjection(context, scan, table, statement);
}
+ if (context.hasRowSizeFunction() || context.hasRawRowSizeFunction()) {
+ scan.getFamilyMap().clear();
+ if (context.hasRawRowSizeFunction()) {
+ scan.setRaw(true);
+ scan.readAllVersions();
+ }
+ }
}
}
private static void setQualifierRanges(boolean keyOnlyFilter, PTable table,
Scan scan,
StatementContext context) throws SQLException {
- if (EncodedColumnsUtil.useEncodedQualifierListOptimization(table, scan)) {
+ if (
+ !context.hasRowSizeFunction() && !context.hasRawRowSizeFunction()
+ && EncodedColumnsUtil.useEncodedQualifierListOptimization(table, scan,
context)
+ ) {
Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns())
{
byte[] cq = whereCol.getSecond();
@@ -476,7 +486,7 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
// Allow seeking to column during filtering
preventSeekToColumn = false;
- } else if (!EncodedColumnsUtil.useEncodedQualifierListOptimization(table,
scan)) {
+ } else if (!EncodedColumnsUtil.useEncodedQualifierListOptimization(table,
scan, context)) {
/*
* preventSeekToColumn cannot be true, even if hinted, when encoded
qualifier list
* optimization is being used. When using the optimization, it is
necessary that we explicitly
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/RawRowSizeParseNode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/RawRowSizeParseNode.java
new file mode 100644
index 0000000000..ec076fa911
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/RawRowSizeParseNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.RawRowSizeFunction;
+
+public class RawRowSizeParseNode extends RowSizeParseNode {
+
+ RawRowSizeParseNode(String name, List<ParseNode> children,
BuiltInFunctionInfo info) {
+ super(name, children, info);
+ }
+
+ @Override
+ public FunctionExpression create(List<Expression> children, StatementContext
context)
+ throws SQLException {
+ // It does not take any parameters.
+ if (children.size() != 0) {
+ throw new IllegalArgumentException("RawRowSizeFunction does not take any
parameters");
+ }
+ context.setHasRawRowSizeFunctionFunction(true);
+ return new RawRowSizeFunction(children);
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/RowSizeParseNode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/RowSizeParseNode.java
new file mode 100644
index 0000000000..c0274ecee7
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/RowSizeParseNode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.RowSizeFunction;
+
+public class RowSizeParseNode extends FunctionParseNode {
+
+ RowSizeParseNode(String name, List<ParseNode> children, BuiltInFunctionInfo
info) {
+ super(name, children, info);
+ }
+
+ @Override
+ public FunctionExpression create(List<Expression> children, StatementContext
context)
+ throws SQLException {
+ // It does not take any parameters.
+ if (children.size() != 0) {
+ throw new IllegalArgumentException("RowSizeFunction does not take any
parameters");
+ }
+ context.setHasRowSizeFunctionFunction(true);
+ return new RowSizeFunction(children);
+ }
+
+ @Override
+ public boolean isStateless() {
+ return false;
+ }
+}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
index 5717e00155..ae03d3dd52 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/EncodedColumnsUtil.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.expression.DelegateExpression;
import org.apache.phoenix.expression.Expression;
@@ -133,14 +134,15 @@ public class EncodedColumnsUtil {
return new Pair<>(minQ, maxQ);
}
- public static boolean useEncodedQualifierListOptimization(PTable table, Scan
scan) {
+ public static boolean useEncodedQualifierListOptimization(PTable table, Scan
scan,
+ StatementContext context) {
/*
* HBase doesn't allow raw scans to have columns set. And we need columns
to be set explicitly
* on the scan to use this optimization. Disabling this optimization for
tables with more than
* one column family. See PHOENIX-3890.
*/
- return !scan.isRaw() && table.getColumnFamilies().size() == 1
- && table.getImmutableStorageScheme() != null
+ return !scan.isRaw() && !context.hasRowSizeFunction() &&
!context.hasRawRowSizeFunction()
+ && table.getColumnFamilies().size() == 1 &&
table.getImmutableStorageScheme() != null
&& table.getImmutableStorageScheme() ==
ImmutableStorageScheme.ONE_CELL_PER_COLUMN
&& usesEncodedColumnNames(table) && !table.isTransactional()
&& !ScanUtil.hasDynamicColumns(table);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
index 74fa49b50b..fde20a6130 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseAggregateIT.java
@@ -17,12 +17,12 @@
*/
package org.apache.phoenix.end2end;
-import static
org.apache.phoenix.end2end.ParallelStatsDisabledIT.validateQueryPlan;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.util.TestUtil.assertResultSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.sql.Connection;
@@ -33,9 +33,17 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.ExplainPlanAttributes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PChar;
@@ -258,6 +266,153 @@ public abstract class BaseAggregateIT extends
ParallelStatsDisabledIT {
conn.close();
}
+ @Test
+ public void testRowSizeFunctions() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ initData(conn, tableName);
+
+ // There are 8 rows
+ QueryBuilder queryBuilder =
+ new
QueryBuilder().setSelectExpression("count(1)").setFullTableName(tableName);
+ ResultSet rs = executeQuery(conn, queryBuilder);
+ assertTrue(rs.next());
+ assertEquals(8, rs.getLong(1));
+ assertFalse(rs.next());
+
+ // Row sizes from HBase
+ int[] rowSizes = new int[8];
+ int rowIndex = 0;
+ ConnectionQueryServices cqs =
conn.unwrap(PhoenixConnection.class).getQueryServices();
+ TableName hTableName = TableName.valueOf(tableName);
+ Table table = cqs.getTable(hTableName.getName());
+ Scan scan = new Scan();
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ int size = 0;
+ for (Cell cell : result.rawCells()) {
+ size += cell.getSerializedSize();
+ }
+ rowSizes[rowIndex++] = size;
+ }
+ }
+
+ // Verify that each row sizes is computed correctly by the function
row_size()
+ queryBuilder = new QueryBuilder().setSelectExpression("sum(row_size())")
+ .setFullTableName(tableName).setGroupByClause("ID");
+ rs = executeQuery(conn, queryBuilder);
+ for (int i = 0; i < 8; i++) {
+ assertTrue(rs.next());
+ assertEquals(rowSizes[i], rs.getLong(1));
+ }
+ assertFalse(rs.next());
+
+ // Find the total of row sizes
+ int totalRowSize = 0;
+ for (int i = 0; i < 8; i++) {
+ totalRowSize += rowSizes[i];
+ }
+
+ // Verify that the sum function over row sizes returns the expected total
+ queryBuilder =
+ new
QueryBuilder().setSelectExpression("sum(row_size())").setFullTableName(tableName);
+ rs = executeQuery(conn, queryBuilder);
+ assertTrue(rs.next());
+ assertEquals(totalRowSize, rs.getLong(1));
+ assertFalse(rs.next());
+
+ // Verify that some other aggregation functions work with row_size()
+ queryBuilder =
+ new QueryBuilder().setSelectExpression("avg(row_size()),
min(row_size()), max(row_size())")
+ .setFullTableName(tableName);
+ rs = executeQuery(conn, queryBuilder);
+ assertTrue(rs.next());
+ assertEquals(totalRowSize / 8, rs.getLong(1));
+
+ // There are four 90 byte rows and four 92 byte rows
+ assertEquals(90, rs.getLong(2));
+ assertEquals(92, rs.getLong(3));
+ assertFalse(rs.next());
+
+ queryBuilder = new
QueryBuilder().setSelectExpression("count(1)").setFullTableName(tableName)
+ .setWhereClause("row_size() = 90 and appcpu = 10");
+ rs = executeQuery(conn, queryBuilder);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertFalse(rs.next());
+
+ // Verify that the row size function is not allowed without an aggregation
function in a select
+ // clause
+ queryBuilder = new
QueryBuilder().setSelectExpression("row_size()").setFullTableName(tableName);
+ try {
+ executeQuery(conn, queryBuilder);
+ fail();
+ } catch (SQLException e) {
+ // expected
+ }
+
+ // Make sure row size functions works with multi-tenant tables
+ tableName = generateUniqueName();
+ String sql = "CREATE TABLE " + tableName
+ + " (ORGANIZATION_ID VARCHAR NOT NULL, CONTAINER_ID VARCHAR, ENTITY_ID
INTEGER,"
+ + " CONSTRAINT PK PRIMARY KEY (ORGANIZATION_ID, CONTAINER_ID))
MULTI_TENANT = TRUE";
+ conn.createStatement().execute(sql);
+
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('a','1', 11)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('b','2', 22)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('c','3', 33)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('d','1', 44)");
+ conn.commit();
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('a','1', 12)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('a','2', 11)");
+ conn.commit();
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('c','9', 11)");
+ conn.commit();
+ conn.createStatement().execute("DELETE FROM " + tableName + " WHERE
organization_id='d'");
+ conn.commit();
+
+ queryBuilder = new QueryBuilder().setSelectExpression("ORGANIZATION_ID,
sum(row_size())")
+ .setFullTableName(tableName).setGroupByClause("ORGANIZATION_ID");
+ rs = executeQuery(conn, queryBuilder);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals(118, rs.getLong(2));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertEquals(59, rs.getLong(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertEquals(118, rs.getLong(2));
+ assertFalse(rs.next());
+
+ // Make sure raw_row_size() computation includes all cell versions and
delete markers
+ queryBuilder = new QueryBuilder().setSelectExpression("ORGANIZATION_ID,
sum(raw_row_size())")
+ .setFullTableName(tableName).setGroupByClause("ORGANIZATION_ID");
+ rs = executeQuery(conn, queryBuilder);
+
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ // There are 3 row versions, each version is 59 bytes and so 3 * 59 = 177
+ assertEquals(177, rs.getLong(2));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ // There is one row version and one version is 59 bytes
+ assertEquals(59, rs.getLong(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ // There are two versions, 2 * 59 = 118
+ assertEquals(118, rs.getLong(2));
+ assertTrue(rs.next());
+ assertEquals("d", rs.getString(1));
+ // One row version (59 bytes) and plus delete family marker
+ assertEquals(83, rs.getLong(2));
+ assertFalse(rs.next());
+
+ conn.close();
+ }
+
@Test
public void testGroupByCase() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 5992073953..9a86354778 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -47,7 +47,10 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
@@ -57,6 +60,7 @@ import
org.apache.phoenix.iterate.RowKeyOrderedAggregateResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.CDCUtil;
@@ -531,15 +535,21 @@ public class CDCQueryIT extends CDCBaseIT {
if (tableSaltBuckets == null) {
return 1;
}
- Set<String> nonEmptySaltBuckets = Sets.newHashSet();
- String query = "SELECT /*+ NO_INDEX */ ROWKEY_BYTES_STRING() FROM " +
tableName;
- try (ResultSet rs = conn.createStatement().executeQuery(query)) {
- while (rs.next()) {
- String rowKey = rs.getString(1); // StringBinary format
- // the first 4 bytes will have the salt bucket id like "\x02"
- String bucketID = rowKey.substring(0, 4);
- nonEmptySaltBuckets.add(bucketID);
- }
+ Set<Integer> nonEmptySaltBuckets = Sets.newHashSet();
+ TableName hTableName = TableName.valueOf(tableName);
+ ConnectionQueryServices cqs =
conn.unwrap(PhoenixConnection.class).getQueryServices();
+ Table table = cqs.getTable(hTableName.getName());
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ byte[] row = result.getRow();
+ Integer bucketId = ((int) row[0]);
+ nonEmptySaltBuckets.add(bucketId);
+ }
+ } catch (Exception e) {
+ throw new SQLException(e);
}
return nonEmptySaltBuckets.size();
}