>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20966?usp=email )
Change subject: [ASTERIXDB-3634][EXT]: Support passing time travel in query
......................................................................
[ASTERIXDB-3634][EXT]: Support passing time travel in query
Ext-ref: MB-70761
Change-Id: I77d243a49a5aa9497839a1c50e4f0e148e5fd06e
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
A
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/JoinClauseRightInputModel.java
M
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateClause.java
M
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateWithConditionClause.java
M
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/FromTerm.java
M
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/JoinClause.java
M
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/NestClause.java
M asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TimeTravel.java
12 files changed, 275 insertions(+), 25 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/66/20966/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index 59777c0..5b9383f9 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -135,6 +135,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TimeTravel;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -336,6 +337,10 @@
if (hint != null) {
unnestOp.getAnnotations().put(SUBPATH, hint.getSubPath());
}
+ TimeTravel timeTravel = fromTerm.getTimeTravel();
+ if (timeTravel != null) {
+ unnestOp.setTimeTravel(timeTravel);
+ }
unnestOp.getInputs().add(pUnnestExpr.second);
unnestOp.setSourceLocation(sourceLoc);
@@ -594,6 +599,10 @@
}
unnestOp.getInputs().add(pUnnestExpr.second);
unnestOp.setSourceLocation(binaryCorrelate.getRightVariable().getSourceLocation());
+ TimeTravel timeTravel = binaryCorrelate.getTimeTravel();
+ if (timeTravel != null) {
+ unnestOp.setTimeTravel(timeTravel);
+ }
return new Pair<>(unnestOp, rightVar);
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
index 3e8b02a..4a1e02d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
@@ -22,6 +22,8 @@
import static
org.apache.asterix.common.api.IIdentifierMapper.Modifier.SINGULAR;
import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
import static org.apache.asterix.external.util.ExternalDataConstants.SUBPATH;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY;
import java.io.Serializable;
import java.util.ArrayList;
@@ -56,6 +58,7 @@
import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TimeTravel;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
@@ -116,6 +119,16 @@
if (externalSubpath instanceof String) {
dataSourceProperties.put(SUBPATH, (String) externalSubpath);
}
+ Object timeTravelObj = unnest.getTimeTravel();
+ if (timeTravelObj instanceof TimeTravel timeTravel) {
+ if (timeTravel.getType().equals(TimeTravel.Type.SNAPSHOT_ID)) {
+ dataSourceProperties.put(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY,
timeTravel.getSnapshotIdOrTimestamp());
+ } else if
(timeTravel.getType().equals(TimeTravel.Type.SNAPSHOT_TIMESTAMP)) {
+
dataSourceProperties.put(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY,
timeTravel.getSnapshotIdOrTimestamp());
+ } else {
+ throw new IllegalStateException("Unknown snapshot type");
+ }
+ }
}
List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
scanInpList.addAll(unnest.getInputs());
diff --git
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/JoinClauseRightInputModel.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/JoinClauseRightInputModel.java
new file mode 100644
index 0000000..50eb98b
--- /dev/null
+++
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/JoinClauseRightInputModel.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.lang.sqlpp;
+
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TimeTravel;
+
+public class JoinClauseRightInputModel {
+ Expression rightExpr;
+ VariableExpr rightVar;
+ VariableExpr posVar;
+ TimeTravel timeTravel;
+
+ public JoinClauseRightInputModel(Expression rightExpr, VariableExpr
rightVar, VariableExpr posVar,
+ TimeTravel timeTravel) {
+ this.rightExpr = rightExpr;
+ this.rightVar = rightVar;
+ this.posVar = posVar;
+ this.timeTravel = timeTravel;
+ }
+
+ public Expression getRightExpr() {
+ return rightExpr;
+ }
+
+ public VariableExpr getRightVar() {
+ return rightVar;
+ }
+
+ public VariableExpr getPosVar() {
+ return posVar;
+ }
+
+ public TimeTravel getTimeTravel() {
+ return timeTravel;
+ }
+}
diff --git
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateClause.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateClause.java
index 21770c3..d3a17a6 100644
---
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateClause.java
+++
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateClause.java
@@ -24,17 +24,25 @@
import org.apache.asterix.lang.common.base.AbstractClause;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TimeTravel;
public abstract class AbstractBinaryCorrelateClause extends AbstractClause {
private Expression rightExpr;
- private VariableExpr rightVar;
- private VariableExpr rightPosVar;
+ private final VariableExpr rightVar;
+ private final VariableExpr rightPosVar;
+ private final TimeTravel timeTravel;
public AbstractBinaryCorrelateClause(Expression rightExpr, VariableExpr
rightVar, VariableExpr rightPosVar) {
+ this(rightExpr, rightVar, rightPosVar, null);
+ }
+
+ public AbstractBinaryCorrelateClause(Expression rightExpr, VariableExpr
rightVar, VariableExpr rightPosVar,
+ TimeTravel timeTravel) {
this.rightExpr = rightExpr;
this.rightVar = rightVar;
this.rightPosVar = rightPosVar;
+ this.timeTravel = timeTravel;
}
public Expression getRightExpression() {
@@ -53,6 +61,10 @@
return rightPosVar;
}
+ public TimeTravel getTimeTravel() {
+ return timeTravel;
+ }
+
public boolean hasPositionalVariable() {
return rightPosVar != null;
}
diff --git
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateWithConditionClause.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateWithConditionClause.java
index 581efdd..cfebe24 100644
---
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateWithConditionClause.java
+++
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/AbstractBinaryCorrelateWithConditionClause.java
@@ -21,14 +21,15 @@
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.expression.VariableExpr;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TimeTravel;
public abstract class AbstractBinaryCorrelateWithConditionClause extends
AbstractBinaryCorrelateClause {
private Expression conditionExpr;
public AbstractBinaryCorrelateWithConditionClause(Expression rightExpr,
VariableExpr rightVar,
- VariableExpr rightPosVar, Expression conditionExpr) {
- super(rightExpr, rightVar, rightPosVar);
+ VariableExpr rightPosVar, Expression conditionExpr, TimeTravel
timeTravel) {
+ super(rightExpr, rightVar, rightPosVar, timeTravel);
this.conditionExpr = conditionExpr;
}
diff --git
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/FromTerm.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/FromTerm.java
index 3e26371..e74c44e 100644
---
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/FromTerm.java
+++
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/FromTerm.java
@@ -29,21 +29,29 @@
import org.apache.asterix.lang.common.expression.VariableExpr;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TimeTravel;
public class FromTerm extends AbstractClause {
private Expression leftExpr;
- private VariableExpr leftVar;
- private VariableExpr posVar;
- private List<AbstractBinaryCorrelateClause> correlateClauses = new
ArrayList<>();
+ private final VariableExpr leftVar;
+ private final VariableExpr posVar;
+ private final List<AbstractBinaryCorrelateClause> correlateClauses = new
ArrayList<>();
+ private TimeTravel timeTravel;
public FromTerm(Expression leftExpr, VariableExpr leftVar, VariableExpr
posVar,
List<AbstractBinaryCorrelateClause> correlateClauses) {
+ this(leftExpr, leftVar, posVar, correlateClauses, null);
+ }
+
+ public FromTerm(Expression leftExpr, VariableExpr leftVar, VariableExpr
posVar,
+ List<AbstractBinaryCorrelateClause> correlateClauses, TimeTravel
timeTravel) {
this.leftExpr = leftExpr;
this.leftVar = leftVar;
this.posVar = posVar;
if (correlateClauses != null) {
this.correlateClauses.addAll(correlateClauses);
}
+ this.timeTravel = timeTravel;
}
@Override
@@ -73,7 +81,7 @@
}
public boolean hasCorrelateClauses() {
- return correlateClauses != null && !correlateClauses.isEmpty();
+ return !correlateClauses.isEmpty();
}
public List<AbstractBinaryCorrelateClause> getCorrelateClauses() {
@@ -84,9 +92,13 @@
return posVar != null;
}
+ public TimeTravel getTimeTravel() {
+ return timeTravel;
+ }
+
@Override
public String toString() {
- return String.valueOf(leftExpr) + " AS " + leftVar;
+ return leftExpr + " AS " + leftVar;
}
@Override
@@ -99,11 +111,11 @@
if (this == object) {
return true;
}
- if (!(object instanceof FromTerm)) {
+ if (!(object instanceof FromTerm target)) {
return false;
}
- FromTerm target = (FromTerm) object;
return Objects.equals(correlateClauses, target.correlateClauses) &&
Objects.equals(leftExpr, target.leftExpr)
- && Objects.equals(leftVar, target.leftVar) &&
Objects.equals(posVar, target.posVar);
+ && Objects.equals(leftVar, target.leftVar) &&
Objects.equals(posVar, target.posVar)
+ && Objects.equals(timeTravel, target.timeTravel);
}
}
diff --git
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/JoinClause.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/JoinClause.java
index be3ebda..ca0ad70 100644
---
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/JoinClause.java
+++
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/JoinClause.java
@@ -28,6 +28,7 @@
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.lang.sqlpp.optype.JoinType;
import org.apache.asterix.lang.sqlpp.visitor.base.ISqlppVisitor;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TimeTravel;
public class JoinClause extends AbstractBinaryCorrelateWithConditionClause {
@@ -37,7 +38,12 @@
public JoinClause(JoinType joinType, Expression rightExpr, VariableExpr
rightVar, VariableExpr rightPosVar,
Expression conditionExpr, Literal.Type outerJoinMissingValueType) {
- super(rightExpr, rightVar, rightPosVar, conditionExpr);
+ this(joinType, rightExpr, rightVar, rightPosVar, conditionExpr,
outerJoinMissingValueType, null);
+ }
+
+ public JoinClause(JoinType joinType, Expression rightExpr, VariableExpr
rightVar, VariableExpr rightPosVar,
+ Expression conditionExpr, Literal.Type outerJoinMissingValueType,
TimeTravel timeTravel) {
+ super(rightExpr, rightVar, rightPosVar, conditionExpr, timeTravel);
this.joinType = joinType;
setOuterJoinMissingValueType(outerJoinMissingValueType);
}
diff --git
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/NestClause.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/NestClause.java
index 5f0afb0..8f559509 100644
---
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/NestClause.java
+++
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/clause/NestClause.java
@@ -32,7 +32,7 @@
public NestClause(UnnestType nestType, Expression rightExpr, VariableExpr
rightVar, VariableExpr rightPosVar,
Expression conditionExpr) {
- super(rightExpr, rightVar, rightPosVar, conditionExpr);
+ super(rightExpr, rightVar, rightPosVar, conditionExpr, null);
this.nestType = nestType;
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index 23105fd..a8ef74a 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -194,6 +194,7 @@
import org.apache.asterix.lang.common.util.DatasetDeclParametersUtil;
import org.apache.asterix.lang.common.util.ExpressionUtils;
import org.apache.asterix.lang.common.util.RangeMapBuilder;
+import org.apache.asterix.lang.sqlpp.JoinClauseRightInputModel;
import org.apache.asterix.lang.sqlpp.clause.AbstractBinaryCorrelateClause;
import org.apache.asterix.lang.sqlpp.clause.FromClause;
import org.apache.asterix.lang.sqlpp.clause.FromTerm;
@@ -205,6 +206,7 @@
import org.apache.asterix.lang.sqlpp.clause.SelectElement;
import org.apache.asterix.lang.sqlpp.clause.SelectRegular;
import org.apache.asterix.lang.sqlpp.clause.SelectSetOperation;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TimeTravel;
import org.apache.asterix.lang.sqlpp.clause.UnnestClause;
import org.apache.asterix.lang.common.clause.WhereClause;
import org.apache.asterix.lang.sqlpp.expression.ChangeExpression;
@@ -5939,6 +5941,9 @@
VariableExpr posVar = null;
AbstractBinaryCorrelateClause correlateClause = null;
List<AbstractBinaryCorrelateClause> correlateClauses = new
ArrayList<AbstractBinaryCorrelateClause>();
+
+ String snapshotOrTimestamp = null;
+ TimeTravel.Type timeTravelType = TimeTravel.Type.SNAPSHOT_ID;
}
{
leftExpr = Expression()
@@ -5950,7 +5955,19 @@
((VariableExpr) leftExpr).addHint(new
ExternalSubpathAnnotation(subPath));
}
}
- } ((<AS>)? leftVar = Variable())? (<AT> posVar = Variable())?
+ } ((<AS>)? leftVar = Variable())?
+ (
+ <AT>
+ (
+ LOOKAHEAD( { laIdentifier("SNAPSHOT") } )
+ <IDENTIFIER> snapshotOrTimestamp = StringLiteral()
+ { timeTravelType = TimeTravel.Type.SNAPSHOT_ID; }
+ | LOOKAHEAD( { laIdentifier("TIMESTAMP") } )
+ <IDENTIFIER> snapshotOrTimestamp = StringLiteral()
+ { timeTravelType = TimeTravel.Type.SNAPSHOT_TIMESTAMP; }
+ | posVar = Variable()
+ )
+ )?
(
(
correlateClause = JoinOrUnnestClause(JoinType.INNER, UnnestType.INNER)
@@ -5967,7 +5984,13 @@
if (leftVar == null) {
leftVar = ExpressionToVariableUtil.getGeneratedVariable(leftExpr,
true);
}
- FromTerm fromTerm = new FromTerm(leftExpr, leftVar, posVar,
correlateClauses);
+ FromTerm fromTerm;
+ if (snapshotOrTimestamp != null) {
+ TimeTravel timeTravel = new TimeTravel(snapshotOrTimestamp,
timeTravelType);
+ fromTerm = new FromTerm(leftExpr, leftVar, posVar, correlateClauses,
timeTravel);
+ } else {
+ fromTerm = new FromTerm(leftExpr, leftVar, posVar, correlateClauses);
+ }
fromTerm.setSourceLocation(leftExpr.getSourceLocation());
return fromTerm;
}
@@ -5987,14 +6010,14 @@
JoinClause JoinClause(JoinType joinType) throws ParseException :
{
Token startToken = null;
- Triple<Expression, VariableExpr, VariableExpr> rightInput = null;
+ JoinClauseRightInputModel rightInput = null;
Expression conditionExpr = null;
}
{
<JOIN> { startToken = token; } rightInput = JoinClauseRightInput() <ON>
conditionExpr = Expression()
{
- JoinClause joinClause = new JoinClause(joinType, rightInput.first,
rightInput.second, rightInput.third,
- conditionExpr, joinType == JoinType.INNER ? null : Literal.Type.MISSING);
+ JoinClause joinClause = new JoinClause(joinType,
rightInput.getRightExpr(), rightInput.getRightVar(), rightInput.getPosVar(),
+ conditionExpr, joinType == JoinType.INNER ? null : Literal.Type.MISSING,
rightInput.getTimeTravel());
return addSourceLocation(joinClause, startToken);
}
}
@@ -6002,23 +6025,26 @@
JoinClause CrossJoinClause() throws ParseException :
{
Token startToken = null;
- Triple<Expression, VariableExpr, VariableExpr> rightInput = null;
+ JoinClauseRightInputModel rightInput = null;
Expression conditionExpr = null;
}
{
<JOIN> { startToken = token; } rightInput = JoinClauseRightInput()
{
- JoinClause joinClause = new JoinClause(JoinType.INNER, rightInput.first,
rightInput.second, rightInput.third,
- new LiteralExpr(TrueLiteral.INSTANCE), null);
+ JoinClause joinClause = new JoinClause(JoinType.INNER,
rightInput.getRightExpr(), rightInput.getRightVar(), rightInput.getPosVar(),
+ new LiteralExpr(TrueLiteral.INSTANCE), null, rightInput.getTimeTravel());
return addSourceLocation(joinClause, startToken);
}
}
-Triple<Expression, VariableExpr, VariableExpr> JoinClauseRightInput() throws
ParseException :
+JoinClauseRightInputModel JoinClauseRightInput() throws ParseException :
{
Expression rightExpr = null;
VariableExpr rightVar = null;
VariableExpr posVar = null;
+
+ String snapshotOrTimestamp = null;
+ TimeTravel.Type timeTravelType = TimeTravel.Type.SNAPSHOT_ID;
}
{
rightExpr = Expression()
@@ -6030,12 +6056,28 @@
((VariableExpr) rightExpr).addHint(new
ExternalSubpathAnnotation(subPath));
}
}
- } ((<AS>)? rightVar = Variable())? (<AT> posVar = Variable())?
+ } ((<AS>)? rightVar = Variable())?
+ (
+ <AT>
+ (
+ LOOKAHEAD( { laIdentifier("SNAPSHOT") } )
+ <IDENTIFIER> snapshotOrTimestamp = StringLiteral()
+ { timeTravelType = TimeTravel.Type.SNAPSHOT_ID; }
+ | LOOKAHEAD( { laIdentifier("TIMESTAMP") } )
+ <IDENTIFIER> snapshotOrTimestamp = StringLiteral()
+ { timeTravelType = TimeTravel.Type.SNAPSHOT_TIMESTAMP; }
+ | posVar = Variable()
+ )
+ )?
{
if (rightVar == null) {
rightVar = ExpressionToVariableUtil.getGeneratedVariable(rightExpr,
true);
}
- return new Triple<Expression, VariableExpr, VariableExpr>(rightExpr,
rightVar, posVar);
+ TimeTravel timeTravel = null;
+ if (snapshotOrTimestamp != null) {
+ timeTravel = new TimeTravel(snapshotOrTimestamp, timeTravelType);
+ }
+ return new JoinClauseRightInputModel(rightExpr, rightVar, posVar,
timeTravel);
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index f7bf0f6..85941cf 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -33,6 +33,7 @@
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.entities.Dataset;
@@ -139,6 +140,7 @@
Map<String, String> properties =
addExternalProjectionInfo(projectionFiltrationInfo,
edd.getProperties());
properties = addSubPath(externalDataSource.getProperties(),
properties);
+ properties = addTimeTravel(externalDataSource.getProperties(),
properties);
properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE,
String.valueOf(externalScanBufferSize));
setExternalCollectionCompilerProperties(metadataProvider,
properties);
IExternalFilterEvaluatorFactory filterEvaluatorFactory =
metadataProvider
@@ -209,6 +211,32 @@
return propertiesCopy;
}
+ private Map<String, String> addTimeTravel(Map<String, Serializable>
dataSourceProps,
+ Map<String, String> properties) {
+ Serializable snapshotId =
dataSourceProps.get(IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+ Serializable snapshotTimestamp =
dataSourceProps.get(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+ if (snapshotId == null && snapshotTimestamp == null) {
+ return properties;
+ }
+
+ if (snapshotId != null && !(snapshotId instanceof String)) {
+ return properties;
+ }
+ if (snapshotTimestamp != null && !(snapshotTimestamp instanceof
String)) {
+ return properties;
+ }
+
+ Map<String, String> propertiesCopy = new HashMap<>(properties);
+
propertiesCopy.remove(IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+
propertiesCopy.remove(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+ if (snapshotId != null) {
+
propertiesCopy.put(IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY,
snapshotId.toString());
+ } else {
+
propertiesCopy.put(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY,
snapshotTimestamp.toString());
+ }
+ return propertiesCopy;
+ }
+
private int[] createFilterIndexes(List<LogicalVariable> filterVars,
IOperatorSchema opSchema) {
if (filterVars != null && !filterVars.isEmpty()) {
final int size = filterVars.size();
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 571a30a..5cf7d45 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -73,6 +73,7 @@
protected List<LogicalVariable> schema;
private SourceLocation sourceLoc;
+ private TimeTravel timeTravel;
public AbstractLogicalOperator() {
inputs = new ArrayList<>();
@@ -160,6 +161,14 @@
annotations.remove(annotationName);
}
+ public TimeTravel getTimeTravel() {
+ return timeTravel;
+ }
+
+ public void setTimeTravel(TimeTravel timeTravel) {
+ this.timeTravel = timeTravel;
+ }
+
@Override
public final void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TimeTravel.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TimeTravel.java
new file mode 100644
index 0000000..79822b6
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TimeTravel.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.Objects;
+
+public class TimeTravel {
+
+ public enum Type {
+ SNAPSHOT_ID,
+ SNAPSHOT_TIMESTAMP
+ }
+
+ private final String snapshotIdOrTimestamp;
+ private final Type type;
+
+ public TimeTravel(String snapshotIdOrTimestamp, Type type) {
+ this.snapshotIdOrTimestamp = snapshotIdOrTimestamp;
+ this.type = type;
+ }
+
+ public String getSnapshotIdOrTimestamp() {
+ return snapshotIdOrTimestamp;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (this == object) {
+ return true;
+ }
+ if (!(object instanceof TimeTravel target)) {
+ return false;
+ }
+ return Objects.equals(snapshotIdOrTimestamp,
target.getSnapshotIdOrTimestamp())
+ && Objects.equals(type, target.getType());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(snapshotIdOrTimestamp, type);
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20966?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I77d243a49a5aa9497839a1c50e4f0e148e5fd06e
Gerrit-Change-Number: 20966
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>