This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/calcite.git
commit ebbb7bb88eb5a732ff954790373fe8d62eb5b8ab Author: yuzhao.cyz <yuzhao....@gmail.com> AuthorDate: Mon Aug 17 13:44:41 2020 +0800 [CALCITE-4171] Support named parameters for table window functions [CALCITE-4171] Support named parameters for table window functions * Changes SqlArgumentAssignmentOperator to allow non-scala query as operand * In SqlCallBinding, matches the permuted operand by name with name matcher * Refactor SqlWindowTableFunction and its sub-class to reuse same logic * Do not patch up the SqlWindowTableFunction with DEFAULTs when sql validation --- .../calcite/adapter/enumerable/EnumUtils.java | 1 - .../apache/calcite/runtime/CalciteResource.java | 4 + .../org/apache/calcite/sql/SqlCallBinding.java | 45 +++++++-- .../apache/calcite/sql/SqlHopTableFunction.java | 88 ++++++++-------- .../calcite/sql/SqlSessionTableFunction.java | 78 ++++++++------ .../apache/calcite/sql/SqlTumbleTableFunction.java | 82 ++++++++------- .../apache/calcite/sql/SqlWindowTableFunction.java | 77 +++++++++++++- .../sql/fun/SqlArgumentAssignmentOperator.java | 4 + .../calcite/sql/validate/SqlValidatorImpl.java | 8 +- .../calcite/runtime/CalciteResource.properties | 1 + .../apache/calcite/test/SqlToRelConverterTest.java | 64 ++++++++++++ .../org/apache/calcite/test/SqlValidatorTest.java | 98 ++++++++++++++++-- .../apache/calcite/test/SqlToRelConverterTest.xml | 112 +++++++++++++++++++++ core/src/test/resources/sql/stream.iq | 61 +++++++++++ site/_docs/reference.md | 63 ++++++++++-- 15 files changed, 646 insertions(+), 140 deletions(-) diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java index 87fd196..ae9455f 100644 --- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java +++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java @@ -804,7 +804,6 @@ public class EnumUtils { expressions.add(expression); } final Expression wmColExprToLong = EnumUtils.convert(wmColExpr, long.class); - final Expression shiftExpr = Expressions.constant(1, long.class); // Find the fixed window for a timestamp given a window size and an offset, and return the // window start. diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java index 7987546..3aaec91 100644 --- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java +++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java @@ -220,6 +220,10 @@ public interface CalciteResource { @BaseMessage("Column ''{0}'' is ambiguous") ExInst<SqlValidatorException> columnAmbiguous(String a0); + @BaseMessage("Param ''{0}'' not found in function ''{1}''; did you mean ''{2}''?") + ExInst<SqlValidatorException> paramNotFoundInFunctionDidYouMean(String a0, + String a1, String a2); + @BaseMessage("Operand {0} must be a query") ExInst<SqlValidatorException> needQueryOp(String a0); diff --git a/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java b/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java index d944812..ea6dad8 100644 --- a/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java +++ b/core/src/main/java/org/apache/calcite/sql/SqlCallBinding.java @@ -27,6 +27,7 @@ import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SelectScope; import org.apache.calcite.sql.validate.SqlMonotonicity; +import org.apache.calcite.sql.validate.SqlNameMatcher; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.calcite.sql.validate.SqlValidatorNamespace; @@ -34,6 +35,7 @@ import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.util.ImmutableNullableList; import org.apache.calcite.util.NlsString; +import org.apache.calcite.util.Pair; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -161,17 +163,42 @@ public class SqlCallBinding extends SqlOperatorBinding { * formal parameters of the function. */ private List<SqlNode> permutedOperands(final SqlCall call) { final SqlFunction operator = (SqlFunction) call.getOperator(); - return Lists.transform(operator.getParamNames(), paramName -> { - for (SqlNode operand2 : call.getOperandList()) { - final SqlCall call2 = (SqlCall) operand2; - assert operand2.getKind() == SqlKind.ARGUMENT_ASSIGNMENT; - final SqlIdentifier id = call2.operand(1); - if (id.getSimple().equals(paramName)) { - return call2.operand(0); + final List<String> paramNames = operator.getParamNames(); + final List<SqlNode> permuted = new ArrayList<>(); + final SqlNameMatcher nameMatcher = validator.getCatalogReader().nameMatcher(); + for (final String paramName : paramNames) { + Pair<String, SqlIdentifier> args = null; + for (int j = 0; j < call.getOperandList().size(); j++) { + final SqlCall call2 = call.operand(j); + assert call2.getKind() == SqlKind.ARGUMENT_ASSIGNMENT; + final SqlIdentifier operandID = call2.operand(1); + final String operandName = operandID.getSimple(); + if (nameMatcher.matches(operandName, paramName)) { + permuted.add(call2.operand(0)); + break; + } else if (args == null + && nameMatcher.isCaseSensitive() + && operandName.equalsIgnoreCase(paramName)) { + args = Pair.of(paramName, operandID); + } + // the last operand, there is still no match. + if (j == call.getOperandList().size() - 1) { + if (args != null) { + throw SqlUtil.newContextException(args.right.getParserPosition(), + RESOURCE.paramNotFoundInFunctionDidYouMean(args.right.getSimple(), + call.getOperator().getName(), args.left)); + } + if (!(operator instanceof SqlWindowTableFunction)) { + // Not like user defined functions, we do not patch up the operands + // with DEFAULT and then convert to nulls during sql-to-rel conversion. + // Thus, there is no need to show the optional operands in the plan and + // decide if the optional operand is null when code generation. + permuted.add(DEFAULT_CALL); + } } } - return DEFAULT_CALL; - }); + } + return permuted; } /** diff --git a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java index f5936ae..816140b 100644 --- a/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java +++ b/core/src/main/java/org/apache/calcite/sql/SqlHopTableFunction.java @@ -16,60 +16,68 @@ */ package org.apache.calcite.sql; -import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.type.SqlOperandCountRanges; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.type.SqlTypeUtil; -import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; + +import com.google.common.collect.ImmutableList; + +import java.util.List; /** - * SqlHopTableFunction implements an operator for hopping. It allows four parameters: - * 1. a table; - * 2. a descriptor to provide a watermarked column name from the input table; - * 3. an interval parameter to specify the length of window shifting; - * 4. an interval parameter to specify the length of window size. + * SqlHopTableFunction implements an operator for hopping. + * + * <p>It allows four parameters: + * + * <ol> + * <li>a table</li> + * <li>a descriptor to provide a watermarked column name from the input table</li> + * <li>an interval parameter to specify the length of window shifting</li> + * <li>an interval parameter to specify the length of window size</li> + * </ol> */ public class SqlHopTableFunction extends SqlWindowTableFunction { public SqlHopTableFunction() { - super(SqlKind.HOP.name()); + super(SqlKind.HOP.name(), OperandTypeCheckerImpl.INSTANCE); } - @Override public SqlOperandCountRange getOperandCountRange() { - return SqlOperandCountRanges.between(4, 5); + @Override public List<String> getParamNames() { + return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SLIDE, PARAM_SIZE, PARAM_OFFSET); } - @Override public boolean checkOperandTypes(SqlCallBinding callBinding, - boolean throwOnFailure) { - final SqlNode operand0 = callBinding.operand(0); - final SqlValidator validator = callBinding.getValidator(); - final RelDataType type = validator.getValidatedNodeType(operand0); - if (type.getSqlTypeName() != SqlTypeName.ROW) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); - } - final SqlNode operand1 = callBinding.operand(1); - if (operand1.getKind() != SqlKind.DESCRIPTOR) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** Operand type checker for HOP. */ + private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker { + static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl(); + + @Override public boolean checkOperandTypes( + SqlCallBinding callBinding, boolean throwOnFailure) { + if (!validateTableWithFollowingDescriptors(callBinding, 1)) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } + if (!validateTailingIntervals(callBinding, 2)) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } + return true; } - validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList()); - final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2)); - if (!SqlTypeUtil.isInterval(type2)) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + + @Override public SqlOperandCountRange getOperandCountRange() { + return SqlOperandCountRanges.between(4, 5); } - final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3)); - if (!SqlTypeUtil.isInterval(type3)) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + + @Override public String getAllowedSignatures(SqlOperator op, String opName) { + return opName + "(TABLE table_name, DESCRIPTOR(timecol), " + + "datetime interval, datetime interval[, datetime interval])"; } - if (callBinding.getOperandCount() > 4) { - final RelDataType type4 = validator.getValidatedNodeType(callBinding.operand(4)); - if (!SqlTypeUtil.isInterval(type4)) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); - } + + @Override public Consistency getConsistency() { + return Consistency.NONE; } - return true; - } - @Override public String getAllowedSignatures(String opNameToUse) { - return getName() + "(TABLE table_name, DESCRIPTOR(col), " - + "datetime interval, datetime interval[, datetime interval])"; + @Override public boolean isOptional(int i) { + return i == 4; + } } } diff --git a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java index fef50cc..61a2ba6 100644 --- a/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java +++ b/core/src/main/java/org/apache/calcite/sql/SqlSessionTableFunction.java @@ -18,54 +18,70 @@ package org.apache.calcite.sql; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.type.SqlOperandCountRanges; -import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.sql.validate.SqlValidator; +import com.google.common.collect.ImmutableList; + +import java.util.List; + /** * SqlSessionTableFunction implements an operator for per-key sessionization. It allows * four parameters: - * 1. a table. - * 2. a descriptor to provide a watermarked column name from the input table. - * 3. a descriptor to provide a column as key, on which sessionization will be applied. - * 4. an interval parameter to specify a inactive activity gap to break sessions. + * + * <ol> + * <li>table as data source</li> + * <li>a descriptor to provide a watermarked column name from the input table</li> + * <li>a descriptor to provide a column as key, on which sessionization will be applied</li> + * <li>an interval parameter to specify a inactive activity gap to break sessions</li> + * </ol> */ public class SqlSessionTableFunction extends SqlWindowTableFunction { public SqlSessionTableFunction() { - super(SqlKind.SESSION.name()); + super(SqlKind.SESSION.name(), OperandTypeCheckerImpl.INSTANCE); } - @Override public SqlOperandCountRange getOperandCountRange() { - return SqlOperandCountRanges.of(4); + @Override public List<String> getParamNames() { + return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_KEY, PARAM_SIZE); } - @Override public boolean checkOperandTypes(SqlCallBinding callBinding, - boolean throwOnFailure) { - final SqlNode operand0 = callBinding.operand(0); - final SqlValidator validator = callBinding.getValidator(); - final RelDataType type = validator.getValidatedNodeType(operand0); - if (type.getSqlTypeName() != SqlTypeName.ROW) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** Operand type checker for SESSION. */ + private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker { + static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl(); + + @Override public boolean checkOperandTypes( + SqlCallBinding callBinding, boolean throwOnFailure) { + final SqlValidator validator = callBinding.getValidator(); + if (!validateTableWithFollowingDescriptors(callBinding, 2)) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } + final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3)); + if (!SqlTypeUtil.isInterval(type3)) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } + return true; } - final SqlNode operand1 = callBinding.operand(1); - if (operand1.getKind() != SqlKind.DESCRIPTOR) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + + @Override public SqlOperandCountRange getOperandCountRange() { + return SqlOperandCountRanges.of(4); } - validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList()); - final SqlNode operand2 = callBinding.operand(2); - if (operand2.getKind() != SqlKind.DESCRIPTOR) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + + @Override public String getAllowedSignatures(SqlOperator op, String opName) { + return opName + "(TABLE table_name, DESCRIPTOR(timecol), " + + "DESCRIPTOR(key), datetime interval)"; } - validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand2).getOperandList()); - final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3)); - if (!SqlTypeUtil.isInterval(type3)) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + + @Override public Consistency getConsistency() { + return Consistency.NONE; } - return true; - } - @Override public String getAllowedSignatures(String opNameToUse) { - return getName() + "(TABLE table_name, DESCRIPTOR(col), " - + "DESCRIPTOR(col), datetime interval)"; + @Override public boolean isOptional(int i) { + return false; + } } } diff --git a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java index 75f746d..f32f13d 100644 --- a/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java +++ b/core/src/main/java/org/apache/calcite/sql/SqlTumbleTableFunction.java @@ -16,59 +16,73 @@ */ package org.apache.calcite.sql; -import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.type.SqlOperandCountRanges; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.type.SqlTypeUtil; -import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; + +import com.google.common.collect.ImmutableList; + +import java.util.List; /** * SqlTumbleTableFunction implements an operator for tumbling. * * <p>It allows three parameters: - * 1. a table; - * 2. a descriptor to provide a watermarked column name from the input table; - * 3. an interval parameter to specify the length of window size. + * + * <ol> + * <li>a table</li> + * <li>a descriptor to provide a watermarked column name from the input table</li> + * <li>an interval parameter to specify the length of window size</li> + * </ol> */ public class SqlTumbleTableFunction extends SqlWindowTableFunction { public SqlTumbleTableFunction() { - super(SqlKind.TUMBLE.name()); + super(SqlKind.TUMBLE.name(), OperandTypeCheckerImpl.INSTANCE); } @Override public SqlOperandCountRange getOperandCountRange() { return SqlOperandCountRanges.between(3, 4); } - @Override public boolean checkOperandTypes(SqlCallBinding callBinding, - boolean throwOnFailure) { - // There should only be three operands, and number of operands are checked before - // this call. - final SqlNode operand0 = callBinding.operand(0); - final SqlValidator validator = callBinding.getValidator(); - final RelDataType type = validator.getValidatedNodeType(operand0); - if (type.getSqlTypeName() != SqlTypeName.ROW) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + @Override public List<String> getParamNames() { + return ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SIZE, PARAM_OFFSET); + } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** Operand type checker for SESSION. */ + private static class OperandTypeCheckerImpl implements SqlOperandTypeChecker { + static final OperandTypeCheckerImpl INSTANCE = new OperandTypeCheckerImpl(); + + @Override public boolean checkOperandTypes( + SqlCallBinding callBinding, boolean throwOnFailure) { + // There should only be three operands, and number of operands are checked before + // this call. + if (!validateTableWithFollowingDescriptors(callBinding, 1)) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } + if (!validateTailingIntervals(callBinding, 2)) { + return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + } + return true; } - final SqlNode operand1 = callBinding.operand(1); - if (operand1.getKind() != SqlKind.DESCRIPTOR) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + + @Override public SqlOperandCountRange getOperandCountRange() { + return SqlOperandCountRanges.of(4); } - validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand1).getOperandList()); - final RelDataType type2 = validator.getValidatedNodeType(callBinding.operand(2)); - if (!SqlTypeUtil.isInterval(type2)) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); + + @Override public String getAllowedSignatures(SqlOperator op, String opName) { + return opName + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval" + + "[, datetime interval])"; } - if (callBinding.getOperandCount() > 3) { - final RelDataType type3 = validator.getValidatedNodeType(callBinding.operand(3)); - if (!SqlTypeUtil.isInterval(type3)) { - return throwValidationSignatureErrorOrReturnFalse(callBinding, throwOnFailure); - } + + @Override public Consistency getConsistency() { + return Consistency.NONE; } - return true; - } - @Override public String getAllowedSignatures(String opNameToUse) { - return getName() + "(TABLE table_name, DESCRIPTOR(col1, col2 ...), datetime interval" - + "[, datetime interval])"; + @Override public boolean isOptional(int i) { + return i == 3; + } } } diff --git a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java index a1670c9..2b4dfd3 100644 --- a/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java +++ b/core/src/main/java/org/apache/calcite/sql/SqlWindowTableFunction.java @@ -19,8 +19,10 @@ package org.apache.calcite.sql; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; import org.apache.calcite.sql.type.SqlReturnTypeInference; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.sql.validate.SqlNameMatcher; import org.apache.calcite.sql.validate.SqlValidator; @@ -34,6 +36,25 @@ import static org.apache.calcite.util.Static.RESOURCE; */ public class SqlWindowTableFunction extends SqlFunction implements SqlTableFunction { + + /** The data source which the table function computes with. */ + protected static final String PARAM_DATA = "DATA"; + + /** The time attribute column. Also known as the event time. */ + protected static final String PARAM_TIMECOL = "TIMECOL"; + + /** The window duration INTERVAL. */ + protected static final String PARAM_SIZE = "SIZE"; + + /** The optional align offset for each window. */ + protected static final String PARAM_OFFSET = "OFFSET"; + + /** The session key(s), only used for SESSION window. */ + protected static final String PARAM_KEY = "KEY"; + + /** The slide interval, only used for HOP window. */ + protected static final String PARAM_SLIDE = "SLIDE"; + /** * Type-inference strategy whereby the row type of a table function call is a * ROW, which is combined from the row type of operand #0 (which is a TABLE) @@ -48,16 +69,16 @@ public class SqlWindowTableFunction extends SqlFunction SqlWindowTableFunction::inferRowType; /** Creates a window table function with a given name. */ - public SqlWindowTableFunction(String name) { - super(name, SqlKind.OTHER_FUNCTION, ReturnTypes.CURSOR, null, null, - SqlFunctionCategory.SYSTEM); + public SqlWindowTableFunction(String name, SqlOperandTypeChecker operandTypeChecker) { + super(name, SqlKind.OTHER_FUNCTION, ReturnTypes.CURSOR, null, + operandTypeChecker, SqlFunctionCategory.SYSTEM); } @Override public SqlReturnTypeInference getRowTypeInference() { return ARG0_TABLE_FUNCTION_WINDOWING; } - protected boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding, + protected static boolean throwValidationSignatureErrorOrReturnFalse(SqlCallBinding callBinding, boolean throwOnFailure) { if (throwOnFailure) { throw callBinding.newValidationSignatureError(); @@ -66,7 +87,53 @@ public class SqlWindowTableFunction extends SqlFunction } } - protected void validateColumnNames(SqlValidator validator, + /** + * Validate the heading operands are in the form: + * (ROW, DESCRIPTOR, DESCRIPTOR ..., other params). + * + * @param callBinding The call binding + * @param descriptors The number of descriptors following the first operand (e.g. the table) + * + * @return true if validation passes + */ + protected static boolean validateTableWithFollowingDescriptors( + SqlCallBinding callBinding, int descriptors) { + final SqlNode operand0 = callBinding.operand(0); + final SqlValidator validator = callBinding.getValidator(); + final RelDataType type = validator.getValidatedNodeType(operand0); + if (type.getSqlTypeName() != SqlTypeName.ROW) { + return false; + } + for (int i = 1; i < descriptors + 1; i++) { + final SqlNode operand = callBinding.operand(i); + if (operand.getKind() != SqlKind.DESCRIPTOR) { + return false; + } + validateColumnNames(validator, type.getFieldNames(), ((SqlCall) operand).getOperandList()); + } + return true; + } + + /** + * Validate the operands starting from position {@code startPos} are all INTERVAL. + * + * @param callBinding The call binding + * @param startPos The start position to validate (starting index is 0) + * + * @return true if validation passes + */ + protected static boolean validateTailingIntervals(SqlCallBinding callBinding, int startPos) { + final SqlValidator validator = callBinding.getValidator(); + for (int i = startPos; i < callBinding.getOperandCount(); i++) { + final RelDataType type = validator.getValidatedNodeType(callBinding.operand(i)); + if (!SqlTypeUtil.isInterval(type)) { + return false; + } + } + return true; + } + + private static void validateColumnNames(SqlValidator validator, List<String> fieldNames, List<SqlNode> columnNames) { final SqlNameMatcher matcher = validator.getCatalogReader().nameMatcher(); for (SqlNode columnName : columnNames) { diff --git a/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java b/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java index f303eda..4d5e3f5 100644 --- a/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java +++ b/core/src/main/java/org/apache/calcite/sql/fun/SqlArgumentAssignmentOperator.java @@ -47,4 +47,8 @@ class SqlArgumentAssignmentOperator extends SqlAsOperator { writer.keyword(getName()); call.operand(0).unparse(writer, getRightPrec(), rightPrec); } + + @Override public boolean argumentMustBeScalar(int ordinal) { + return false; + } } diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java index 19e0e21..9123f92 100644 --- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java +++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java @@ -3318,7 +3318,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { * @param clause Name of clause: "WHERE", "GROUP BY", "ON" */ private void validateNoAggs(AggFinder aggFinder, SqlNode node, - String clause) { + String clause) { final SqlCall agg = aggFinder.findAgg(node); if (agg == null) { return; @@ -3500,7 +3500,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { } private void checkRollUp(SqlNode grandParent, SqlNode parent, - SqlNode current, SqlValidatorScope scope, String optionalClause) { + SqlNode current, SqlValidatorScope scope, String optionalClause) { current = stripAs(current); if (current instanceof SqlCall && !(current instanceof SqlSelect)) { // Validate OVER separately @@ -3525,7 +3525,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { } private void checkRollUp(SqlNode grandParent, SqlNode parent, - SqlNode current, SqlValidatorScope scope) { + SqlNode current, SqlValidatorScope scope) { checkRollUp(grandParent, parent, current, scope, null); } @@ -3568,7 +3568,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints { // Returns true iff the given column is valid inside the given aggCall. private boolean isRolledUpColumnAllowedInAgg(SqlIdentifier identifier, SqlValidatorScope scope, - SqlCall aggCall, SqlNode parent) { + SqlCall aggCall, SqlNode parent) { Pair<String, String> pair = findTableColumnPair(identifier, scope); if (pair == null) { diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties index 3a42da3..5a91208 100644 --- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties +++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties @@ -77,6 +77,7 @@ ColumnNotFoundDidYouMean=Column ''{0}'' not found in any table; did you mean ''{ ColumnNotFoundInTable=Column ''{0}'' not found in table ''{1}'' ColumnNotFoundInTableDidYouMean=Column ''{0}'' not found in table ''{1}''; did you mean ''{2}''? ColumnAmbiguous=Column ''{0}'' is ambiguous +ParamNotFoundInFunctionDidYouMean = Param ''{0}'' not found in function ''{1}''; did you mean ''{2}''? NeedQueryOp=Operand {0} must be a query NeedSameTypeParameter=Parameters must be of the same type CanNotApplyOp2Type=Cannot apply ''{0}'' to arguments of type {1}. Supported form(s): {2} diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java index 4100557..44124a6 100644 --- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java +++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java @@ -1829,6 +1829,26 @@ class SqlToRelConverterTest extends SqlToRelTestBase { sql(sql).ok(); } + @Test void testTableFunctionTumbleWithParamNames() { + final String sql = "select *\n" + + "from table(\n" + + "tumble(\n" + + " DATA => table Shipments,\n" + + " TIMECOL => descriptor(rowtime),\n" + + " SIZE => INTERVAL '1' MINUTE))"; + sql(sql).ok(); + } + + @Test void testTableFunctionTumbleWithParamReordered() { + final String sql = "select *\n" + + "from table(\n" + + "tumble(\n" + + " DATA => table Shipments,\n" + + " SIZE => INTERVAL '1' MINUTE,\n" + + " TIMECOL => descriptor(rowtime)))"; + sql(sql).ok(); + } + @Test void testTableFunctionTumbleWithInnerJoin() { final String sql = "select *\n" + "from table(tumble(table Shipments, descriptor(rowtime), INTERVAL '1' MINUTE)) a\n" @@ -1858,6 +1878,28 @@ class SqlToRelConverterTest extends SqlToRelTestBase { sql(sql).ok(); } + @Test void testTableFunctionHopWithParamNames() { + final String sql = "select *\n" + + "from table(\n" + + "hop(\n" + + " DATA => table Shipments,\n" + + " TIMECOL => descriptor(rowtime),\n" + + " SLIDE => INTERVAL '1' MINUTE,\n" + + " SIZE => INTERVAL '2' MINUTE))"; + sql(sql).ok(); + } + + @Test void testTableFunctionHopWithParamReordered() { + final String sql = "select *\n" + + "from table(\n" + + "hop(\n" + + " DATA => table Shipments,\n" + + " SLIDE => INTERVAL '1' MINUTE,\n" + + " TIMECOL => descriptor(rowtime),\n" + + " SIZE => INTERVAL '2' MINUTE))"; + sql(sql).ok(); + } + @Test void testTableFunctionSession() { final String sql = "select *\n" + "from table(session(table Shipments, descriptor(rowtime), " @@ -1865,6 +1907,28 @@ class SqlToRelConverterTest extends SqlToRelTestBase { sql(sql).ok(); } + @Test void testTableFunctionSessionWithParamNames() { + final String sql = "select *\n" + + "from table(\n" + + "session(\n" + + " DATA => table Shipments,\n" + + " TIMECOL => descriptor(rowtime),\n" + + " KEY => descriptor(orderId),\n" + + " SIZE => INTERVAL '10' MINUTE))"; + sql(sql).ok(); + } + + @Test void testTableFunctionSessionWithParamReordered() { + final String sql = "select *\n" + + "from table(\n" + + "session(\n" + + " DATA => table Shipments,\n" + + " KEY => descriptor(orderId),\n" + + " TIMECOL => descriptor(rowtime),\n" + + " SIZE => INTERVAL '10' MINUTE))"; + sql(sql).ok(); + } + @Test void testTableFunctionTumbleWithSubQueryParam() { final String sql = "select *\n" + "from table(tumble((select * from Shipments), descriptor(rowtime), INTERVAL '1' MINUTE))"; diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java index 32cc678..bb6a82f 100644 --- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java +++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java @@ -10287,13 +10287,41 @@ public class SqlValidatorTest extends SqlValidatorTestCase { } @Test public void testTumbleTableFunction() { - sql("select * from table(\n" - + "^tumble(table orders, descriptor(rowtime))^)") - .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments"); sql("select rowtime, productid, orderid, 'window_start', 'window_end' from table(\n" + "tumble(table orders, descriptor(rowtime), interval '2' hour))").ok(); sql("select rowtime, productid, orderid, 'window_start', 'window_end' from table(\n" + "tumble(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour))").ok(); + // test named params. + sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n" + + "from table(\n" + + "tumble(\n" + + "data => table orders,\n" + + "timecol => descriptor(rowtime),\n" + + "size => interval '2' hour))").ok(); + sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n" + + "from table(\n" + + "tumble(\n" + + "data => table orders,\n" + + "timecol => descriptor(rowtime),\n" + + "size => interval '2' hour,\n" + + "\"OFFSET\" => interval '1' hour))").ok(); + // negative tests. + sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n" + + "from table(\n" + + "tumble(\n" + + "^\"data\"^ => table orders,\n" + + "TIMECOL => descriptor(rowtime),\n" + + "SIZE => interval '2' hour))") + .fails("Param 'data' not found in function 'TUMBLE'; did you mean 'DATA'\\?"); + sql("select rowtime, productid, orderid, 'window_start', 'window_end'\n" + + "from table(\n" + + "^tumble(\n" + + "data => table orders,\n" + + "SIZE => interval '2' hour)^)") + .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments"); + sql("select * from table(\n" + + "^tumble(table orders, descriptor(rowtime))^)") + .fails("Invalid number of arguments to function 'TUMBLE'. Was expecting 3 arguments"); sql("select * from table(\n" + "^tumble(table orders, descriptor(rowtime), 'test')^)") .fails("Cannot apply 'TUMBLE' to arguments of type 'TUMBLE\\(<RECORDTYPE\\" @@ -10326,6 +10354,34 @@ public class SqlValidatorTest extends SqlValidatorTestCase { sql("select * from table(\n" + "hop(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour, " + "interval '20' minute))").ok(); + // test named params. + sql("select * from table(\n" + + "hop(\n" + + "data => table orders,\n" + + "timecol => descriptor(rowtime),\n" + + "slide => interval '2' hour,\n" + + "size => interval '1' hour))").ok(); + sql("select * from table(\n" + + "hop(\n" + + "data => table orders,\n" + + "timecol => descriptor(rowtime),\n" + + "slide => interval '2' hour,\n" + + "size => interval '1' hour,\n" + + "\"OFFSET\" => interval '20' minute))").ok(); + // negative tests. + sql("select * from table(\n" + + "hop(\n" + + "^\"data\"^ => table orders,\n" + + "timecol => descriptor(rowtime),\n" + + "slide => interval '2' hour,\n" + + "size => interval '1' hour))") + .fails("Param 'data' not found in function 'HOP'; did you mean 'DATA'\\?"); + sql("select * from table(\n" + + "^hop(\n" + + "data => table orders,\n" + + "slide => interval '2' hour,\n" + + "size => interval '1' hour)^)") + .fails("Invalid number of arguments to function 'HOP'. Was expecting 4 arguments"); sql("select * from table(\n" + "^hop(table orders, descriptor(rowtime), interval '2' hour)^)") .fails("Invalid number of arguments to function 'HOP'. Was expecting 4 arguments"); @@ -10334,25 +10390,25 @@ public class SqlValidatorTest extends SqlValidatorTestCase { .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) " + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, " + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\(" - + "col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)"); + + "timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)"); sql("select * from table(\n" + "^hop(table orders, descriptor(rowtime), 'test', interval '2' hour)^)") .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) " + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, " + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\(" - + "col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)"); + + "timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)"); sql("select * from table(\n" + "^hop(table orders, 'test', interval '2' hour, interval '2' hour)^)") .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) " + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <INTERVAL HOUR>, " + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, DESCRIPTOR\\(" - + "col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)"); + + "timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)"); sql("select * from table(\n" + "^hop(table orders, descriptor(rowtime), interval '2' hour, interval '1' hour, 'test')^)") .fails("Cannot apply 'HOP' to arguments of type 'HOP\\(<RECORDTYPE\\(TIMESTAMP\\(0\\) " + "ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <INTERVAL HOUR>, " + "<INTERVAL HOUR>, <CHAR\\(4\\)>\\)'. Supported form\\(s\\): HOP\\(TABLE table_name, " - + "DESCRIPTOR\\(col\\), datetime interval, datetime interval\\[, datetime interval\\]\\)"); + + "DESCRIPTOR\\(timecol\\), datetime interval, datetime interval\\[, datetime interval\\]\\)"); sql("select * from table(\n" + "hop(TABLE ^tabler_not_exist^, descriptor(rowtime), interval '2' hour, interval '1' hour))") .fails("Object 'TABLER_NOT_EXIST' not found"); @@ -10362,6 +10418,28 @@ public class SqlValidatorTest extends SqlValidatorTestCase { sql("select * from table(\n" + "session(table orders, descriptor(rowtime), descriptor(productid), interval '1' hour))") .ok(); + // test named params. + sql("select * from table(\n" + + "session(\n" + + "data => table orders,\n" + + "timecol => descriptor(rowtime),\n" + + "key => descriptor(productid),\n" + + "size => interval '1' hour))") + .ok(); + // negative tests. + sql("select * from table(\n" + + "session(\n" + + "^\"data\"^ => table orders,\n" + + "timecol => descriptor(rowtime),\n" + + "key => descriptor(productid),\n" + + "size => interval '1' hour))") + .fails("Param 'data' not found in function 'SESSION'; did you mean 'DATA'\\?"); + sql("select * from table(\n" + + "^session(\n" + + "data => table orders,\n" + + "key => descriptor(productid),\n" + + "size => interval '1' hour)^)") + .fails("Invalid number of arguments to function 'SESSION'. Was expecting 4 arguments"); sql("select * from table(\n" + "^session(table orders, descriptor(rowtime), interval '2' hour)^)") .fails("Invalid number of arguments to function 'SESSION'. Was expecting 4 arguments"); @@ -10370,19 +10448,19 @@ public class SqlValidatorTest extends SqlValidatorTestCase { .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\(" + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <COLUMN_LIST>, " + "<CHAR\\(4\\)>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\(" - + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)"); + + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)"); sql("select * from table(\n" + "^session(table orders, descriptor(rowtime), 'test', interval '2' hour)^)") .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\(" + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <COLUMN_LIST>, <CHAR\\(4\\)>, " + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\(" - + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)"); + + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)"); sql("select * from table(\n" + "^session(table orders, 'test', descriptor(productid), interval '2' hour)^)") .fails("Cannot apply 'SESSION' to arguments of type 'SESSION\\(<RECORDTYPE\\(TIMESTAMP\\(" + "0\\) ROWTIME, INTEGER PRODUCTID, INTEGER ORDERID\\)>, <CHAR\\(4\\)>, <COLUMN_LIST>, " + "<INTERVAL HOUR>\\)'. Supported form\\(s\\): SESSION\\(TABLE table_name, DESCRIPTOR\\(" - + "col\\), DESCRIPTOR\\(col\\), datetime interval\\)"); + + "timecol\\), DESCRIPTOR\\(key\\), datetime interval\\)"); sql("select * from table(\n" + "session(TABLE ^tabler_not_exist^, descriptor(rowtime), descriptor(productid), interval '1' hour))") .fails("Object 'TABLER_NOT_EXIST' not found"); diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml index 2758df7..69c57e2 100644 --- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml +++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml @@ -5019,6 +5019,42 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3]) ]]> </Resource> </TestCase> + <TestCase name="testTableFunctionTumbleWithParamNames"> + <Resource name="sql"> + <![CDATA[select * +from table( +tumble( + DATA => table Shipments, + TIMECOL => descriptor(rowtime), + SIZE => INTERVAL '1' MINUTE))]]> + </Resource> + <Resource name="plan"> + <![CDATA[ +LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3]) + LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)]) + LogicalProject(ORDERID=[$0], ROWTIME=[$1]) + LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]]) +]]> + </Resource> + </TestCase> + <TestCase name="testTableFunctionTumbleWithParamReordered"> + <Resource name="sql"> + <![CDATA[select * +from table( +tumble( + DATA => table Shipments, + SIZE => INTERVAL '1' MINUTE, + TIMECOL => descriptor(rowtime)))]]> + </Resource> + <Resource name="plan"> + <![CDATA[ +LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3]) + LogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($1), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)]) + LogicalProject(ORDERID=[$0], ROWTIME=[$1]) + LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]]) +]]> + </Resource> + </TestCase> <TestCase name="testTableFunctionTumbleWithInnerJoin"> <Resource name="sql"> <![CDATA[select * @@ -5067,6 +5103,44 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3]) ]]> </Resource> </TestCase> + <TestCase name="testTableFunctionHopWithParamNames"> + <Resource name="sql"> + <![CDATA[select * +from table( +hop( + TIMECOL => descriptor(rowtime), + SLIDE => INTERVAL '1' MINUTE, + DATA => table Shipments, + SIZE => INTERVAL '2' MINUTE))]]> + </Resource> + <Resource name="plan"> + <![CDATA[ +LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3]) + LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)]) + LogicalProject(ORDERID=[$0], ROWTIME=[$1]) + LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]]) +]]> + </Resource> + </TestCase> + <TestCase name="testTableFunctionHopWithParamReordered"> + <Resource name="sql"> + <![CDATA[select * +from table( +hop( + DATA => table Shipments, + SLIDE => INTERVAL '1' MINUTE, + TIMECOL => descriptor(rowtime), + SIZE => INTERVAL '2' MINUTE))]]> + </Resource> + <Resource name="plan"> + <![CDATA[ +LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3]) + LogicalTableFunctionScan(invocation=[HOP(DESCRIPTOR($1), 60000:INTERVAL MINUTE, 120000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)]) + LogicalProject(ORDERID=[$0], ROWTIME=[$1]) + LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]]) +]]> + </Resource> + </TestCase> <TestCase name="testTableFunctionHopWithOffset"> <Resource name="sql"> <![CDATA[select * @@ -5095,6 +5169,44 @@ LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3]) ]]> </Resource> </TestCase> + <TestCase name="testTableFunctionSessionWithParamNames"> + <Resource name="sql"> + <![CDATA[select * +from table( +session( + DATA => table Shipments, + TIMECOL => descriptor(rowtime), + KEY => descriptor(orderId), + SIZE => INTERVAL '10' MINUTE))]]> + </Resource> + <Resource name="plan"> + <![CDATA[ +LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3]) + LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)]) + LogicalProject(ORDERID=[$0], ROWTIME=[$1]) + LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]]) +]]> + </Resource> + </TestCase> + <TestCase name="testTableFunctionSessionWithParamReordered"> + <Resource name="sql"> + <![CDATA[select * +from table( +session( + DATA => table Shipments, + KEY => descriptor(orderId), + TIMECOL => descriptor(rowtime), + SIZE => INTERVAL '10' MINUTE))]]> + </Resource> + <Resource name="plan"> + <![CDATA[ +LogicalProject(ORDERID=[$0], ROWTIME=[$1], window_start=[$2], window_end=[$3]) + LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($1), DESCRIPTOR($0), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER ORDERID, TIMESTAMP(0) ROWTIME, TIMESTAMP(0) window_start, TIMESTAMP(0) window_end)]) + LogicalProject(ORDERID=[$0], ROWTIME=[$1]) + LogicalTableScan(table=[[CATALOG, SALES, SHIPMENTS]]) +]]> + </Resource> + </TestCase> <TestCase name="testTableFunctionTumbleWithSubQueryParam"> <Resource name="sql"> <![CDATA[select * diff --git a/core/src/test/resources/sql/stream.iq b/core/src/test/resources/sql/stream.iq index 724eabd..365994e 100644 --- a/core/src/test/resources/sql/stream.iq +++ b/core/src/test/resources/sql/stream.iq @@ -17,6 +17,24 @@ # !use orinoco !set outputformat mysql +SELECT * FROM TABLE( + TUMBLE( + DATA => TABLE ORDERS, + TIMECOL => DESCRIPTOR(ROWTIME), + SIZE => INTERVAL '1' MINUTE)); ++---------------------+----+---------+-------+---------------------+---------------------+ +| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end | ++---------------------+----+---------+-------+---------------------+---------------------+ +| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:16:00 | +| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:24:00 | 2015-02-15 10:25:00 | +| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:24:00 | 2015-02-15 10:25:00 | +| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 10:59:00 | +| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:10:00 | 2015-02-15 11:11:00 | ++---------------------+----+---------+-------+---------------------+---------------------+ +(5 rows) + +!ok + SELECT * FROM TABLE(TUMBLE(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '1' MINUTE)); +---------------------+----+---------+-------+---------------------+---------------------+ | ROWTIME | ID | PRODUCT | UNITS | window_start | window_end | @@ -78,6 +96,30 @@ SELECT * FROM TABLE(HOP(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE, !ok +SELECT * FROM TABLE( + HOP( + DATA => TABLE ORDERS, + TIMECOL => DESCRIPTOR(ROWTIME), + SLIDE => INTERVAL '5' MINUTE, + SIZE => INTERVAL '10' MINUTE)); ++---------------------+----+---------+-------+---------------------+---------------------+ +| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end | ++---------------------+----+---------+-------+---------------------+---------------------+ +| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:10:00 | 2015-02-15 10:20:00 | +| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 | +| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 | +| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 | +| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:15:00 | 2015-02-15 10:25:00 | +| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:20:00 | 2015-02-15 10:30:00 | +| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:50:00 | 2015-02-15 11:00:00 | +| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:55:00 | 2015-02-15 11:05:00 | +| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:05:00 | 2015-02-15 11:15:00 | +| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 11:10:00 | 2015-02-15 11:20:00 | ++---------------------+----+---------+-------+---------------------+---------------------+ +(10 rows) + +!ok + SELECT * FROM TABLE(HOP(TABLE ORDERS, DESCRIPTOR(ROWTIME), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE, INTERVAL '2' MINUTE)); +---------------------+----+---------+-------+---------------------+---------------------+ | ROWTIME | ID | PRODUCT | UNITS | window_start | window_end | @@ -130,6 +172,25 @@ SELECT * FROM TABLE(SESSION(TABLE ORDERS, DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUC !ok +SELECT * FROM TABLE( + SESSION( + DATA => TABLE ORDERS, + TIMECOL => DESCRIPTOR(ROWTIME), + KEY => DESCRIPTOR(PRODUCT), + SIZE => INTERVAL '20' MINUTE)); ++---------------------+----+---------+-------+---------------------+---------------------+ +| ROWTIME | ID | PRODUCT | UNITS | window_start | window_end | ++---------------------+----+---------+-------+---------------------+---------------------+ +| 2015-02-15 10:15:00 | 1 | paint | 10 | 2015-02-15 10:15:00 | 2015-02-15 10:35:00 | +| 2015-02-15 10:24:15 | 2 | paper | 5 | 2015-02-15 10:24:15 | 2015-02-15 10:44:15 | +| 2015-02-15 10:24:45 | 3 | brush | 12 | 2015-02-15 10:24:45 | 2015-02-15 10:44:45 | +| 2015-02-15 10:58:00 | 4 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 | +| 2015-02-15 11:10:00 | 5 | paint | 3 | 2015-02-15 10:58:00 | 2015-02-15 11:30:00 | ++---------------------+----+---------+-------+---------------------+---------------------+ +(5 rows) + +!ok + SELECT * FROM TABLE(SESSION((SELECT * FROM ORDERS), DESCRIPTOR(ROWTIME), DESCRIPTOR(PRODUCT), INTERVAL '20' MINUTE)); +---------------------+----+---------+-------+---------------------+---------------------+ | ROWTIME | ID | PRODUCT | UNITS | window_start | window_end | diff --git a/site/_docs/reference.md b/site/_docs/reference.md index 3bf384f..d5928aa 100644 --- a/site/_docs/reference.md +++ b/site/_docs/reference.md @@ -1876,10 +1876,24 @@ is named as "fixed windowing". | Operator syntax | Description |:-------------------- |:----------- -| TUMBLE(table, DESCRIPTOR(datetime), interval [, offset ]) | Indicates a tumbling window of *interval* for *datetime*, optionally aligned at offset. +| TUMBLE(data, DESCRIPTOR(timecol), size [, offset ]) | Indicates a tumbling window of *size* interval for *timecol*, optionally aligned at *offset*. Here is an example: -`SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '1' MINUTE))`, +```SQL +SELECT * FROM TABLE( + TUMBLE( + TABLE orders, + DESCRIPTOR(rowtime), + INTERVAL '1' MINUTE)); + +-- or with the named params +-- note: the DATA param must be the first +SELECT * FROM TABLE( + TUMBLE( + DATA => TABLE orders, + TIMECOL => DESCRIPTOR(rowtime), + SIZE => INTERVAL '1' MINUTE)); +``` will apply tumbling with 1 minute window size on rows from table orders. rowtime is the watermarked column of table orders that tells data completeness. @@ -1890,10 +1904,26 @@ on a timestamp column. Windows assigned could have overlapping so hopping someti | Operator syntax | Description |:-------------------- |:----------- -| HOP(table, DESCRIPTOR(datetime), slide, size [, offset ]) | Indicates a hopping window for *datetime*, covering rows within the interval of *size*, shifting every *slide* and optionally aligned at offset. +| HOP(data, DESCRIPTOR(timecol), slide, size [, offset ]) | Indicates a hopping window for *timecol*, covering rows within the interval of *size*, shifting every *slide* and optionally aligned at *offset*. Here is an example: -`SELECT * FROM TABLE(HOP(TABLE orders, DESCRIPTOR(rowtime), INTERVAL '2' MINUTE, INTERVAL '5' MINUTE))`, +```SQL +SELECT * FROM TABLE( + HOP( + TABLE orders, + DESCRIPTOR(rowtime), + INTERVAL '2' MINUTE, + INTERVAL '5' MINUTE)); + +-- or with the named params +-- note: the DATA param must be the first +SELECT * FROM TABLE( + HOP( + DATA => TABLE orders, + TIMECOL => DESCRIPTOR(rowtime), + SLIDE => INTERVAL '2' MINUTE, + SIZE => INTERVAL '5' MINUTE)); +``` will apply hopping with 5-minute interval size on rows from table orders and shifting every 2 minutes. rowtime is the watermarked column of table orders that tells data completeness. @@ -1904,13 +1934,34 @@ of rows are less than *interval*. Session window is applied per *key*. | Operator syntax | Description |:-------------------- |:----------- -| session(table, DESCRIPTOR(datetime), DESCRIPTOR(key), interval) | Indicates a session window of *interval* for *datetime*. Session window is applied per *key*. +| session(data, DESCRIPTOR(timecol), DESCRIPTOR(key), size) | Indicates a session window of *size* interval for *timecol*. Session window is applied per *key*. Here is an example: -`SELECT * FROM TABLE(SESSION(TABLE orders, DESCRIPTOR(rowtime), DESCRIPTOR(product), INTERVAL '20' MINUTE))`, +```SQL +SELECT * FROM TABLE( + SESSION( + TABLE orders, + DESCRIPTOR(rowtime), + DESCRIPTOR(product), + INTERVAL '20' MINUTE)); + +-- or with the named params +-- note: the DATA param must be the first +SELECT * FROM TABLE( + SESSION( + DATA => TABLE orders, + TIMECOL => DESCRIPTOR(rowtime), + KEY => DESCRIPTOR(product), + SIZE => INTERVAL '20' MINUTE)); +``` will apply session with 20-minute inactive gap on rows from table orders. rowtime is the watermarked column of table orders that tells data completeness. Session is applied per product. +**Note**: The `Tumble`, `Hop` and `Session` window table functions assign +each row in the original table to a window. The output table has all +the same columns as the original table plus two additional columns `window_start` +and `window_end`, which repesent the start and end of the window interval, respectively. + ### Grouped window functions **warning**: grouped window functions are deprecated.