Repository: phoenix Updated Branches: refs/heads/calcite c9f117e5b -> aa0bf5834
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java index 1981c4b..dcab6a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java @@ -150,7 +150,7 @@ public class CorrelatePlan extends DelegateQueryPlan { close(); return null; } - runtimeContext.setCorrelateVariableValue(variableId, current); + runtimeContext.getCorrelateVariable(variableId).setValue(current); rhsIter = rhs.iterator(); rhsCurrent = rhsIter.next(); if ((rhsCurrent == null && (joinType == JoinType.Inner || joinType == JoinType.Semi)) http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java index 99f409e..ea2b74a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java @@ -18,17 +18,16 @@ package org.apache.phoenix.execute; import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; public interface RuntimeContext { + + public interface CorrelateVariable { + public Expression newExpression(int index); + public Tuple getValue(); + public void setValue(Tuple value); + } - public abstract void defineCorrelateVariable(String variableId, TableRef def); - - public abstract Expression newCorrelateVariableReference(String variableId, int index); - - public abstract void setCorrelateVariableValue(String variableId, Tuple value); - - public abstract Tuple getCorrelateVariableValue(String variableId); - + public void defineCorrelateVariable(String variableId, CorrelateVariable def); + public CorrelateVariable getCorrelateVariable(String variableId); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java index 0accea6..86097bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java @@ -17,73 +17,28 @@ */ package org.apache.phoenix.execute; -import java.util.List; import java.util.Map; -import org.apache.phoenix.calcite.PhoenixTable; -import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.schema.ColumnRef; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.tuple.Tuple; - import com.google.common.collect.Maps; public class RuntimeContextImpl implements RuntimeContext { - Map<String, VariableEntry> correlateVariables; + Map<String, CorrelateVariable> correlateVariables; public RuntimeContextImpl() { this.correlateVariables = Maps.newHashMap(); } @Override - public void defineCorrelateVariable(String variableId, TableRef def) { - this.correlateVariables.put(variableId, new VariableEntry(def)); + public void defineCorrelateVariable(String variableId, CorrelateVariable def) { + this.correlateVariables.put(variableId, def); } @Override - public Expression newCorrelateVariableReference(String variableId, int index) { - VariableEntry entry = this.correlateVariables.get(variableId); - if (entry == null) - throw new RuntimeException("Variable '" + variableId + "' undefined."); - - return new ColumnRef(entry.def, entry.mappedColumns.get(index).getPosition()).newColumnExpression(); - } - - @Override - public void setCorrelateVariableValue(String variableId, Tuple value) { - VariableEntry entry = this.correlateVariables.get(variableId); - if (entry == null) - throw new RuntimeException("Variable '" + variableId + "' undefined."); - - entry.setValue(value); - } - - @Override - public Tuple getCorrelateVariableValue(String variableId) { - VariableEntry entry = this.correlateVariables.get(variableId); + public CorrelateVariable getCorrelateVariable(String variableId) { + CorrelateVariable entry = this.correlateVariables.get(variableId); if (entry == null) throw new RuntimeException("Variable '" + variableId + "' undefined."); - return entry.getValue(); - } - - private static class VariableEntry { - private final TableRef def; - private final List<PColumn> mappedColumns; - private Tuple value; - - VariableEntry(TableRef def) { - this.def = def; - this.mappedColumns = PhoenixTable.getMappedColumns(def.getTable()); - } - - Tuple getValue() { - return value; - } - - void setValue(Tuple value) { - this.value = value; - } + return entry; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java index a884949..641bc9d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java @@ -111,6 +111,10 @@ public class TupleProjector { } public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) { + serializeProjectorIntoScan(scan, projector, SCAN_PROJECTOR); + } + + public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector, String attr) { ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { DataOutputStream output = new DataOutputStream(stream); @@ -121,7 +125,7 @@ public class TupleProjector { WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal()); projector.expressions[i].write(output); } - scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray()); + scan.setAttribute(attr, stream.toByteArray()); } catch (IOException e) { throw new RuntimeException(e); } finally { @@ -135,7 +139,11 @@ public class TupleProjector { } public static TupleProjector deserializeProjectorFromScan(Scan scan) { - byte[] proj = scan.getAttribute(SCAN_PROJECTOR); + return deserializeProjectorFromScan(scan, SCAN_PROJECTOR); + } + + public static TupleProjector deserializeProjectorFromScan(Scan scan, String attr) { + byte[] proj = scan.getAttribute(attr); if (proj == null) { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java index 7ba43c7..185e0a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java @@ -41,7 +41,7 @@ public class CorrelateVariableFieldAccessExpression extends BaseTerminalExpressi @Override public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { - Tuple variable = runtimeContext.getCorrelateVariableValue(variableId); + Tuple variable = runtimeContext.getCorrelateVariable(variableId).getValue(); if (variable == null) throw new RuntimeException("Variable '" + variableId + "' not set."); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 8fc0c7e..f9d5cc2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -94,6 +94,7 @@ import co.cask.tephra.TxConstants; public class IndexUtil { public static final String INDEX_COLUMN_NAME_SEP = ":"; public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP); + public static final String INDEX_PROJECTOR = "indexProjector"; private IndexUtil() { } @@ -453,6 +454,11 @@ public class IndexUtil { public static TupleProjector getTupleProjector(Scan scan, ColumnReference[] dataColumns) { if (dataColumns != null && dataColumns.length != 0) { + TupleProjector projector = TupleProjector.deserializeProjectorFromScan(scan, INDEX_PROJECTOR); + if (projector != null) { + return projector; + } + KeyValueSchema keyValueSchema = deserializeLocalIndexJoinSchemaFromScan(scan); KeyValueColumnExpression[] keyValueColumns = new KeyValueColumnExpression[dataColumns.length]; for (int i = 0; i < dataColumns.length; i++) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java index 92a2479..4244877 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.SQLException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -39,7 +40,7 @@ import org.apache.phoenix.query.BaseConnectionlessQueryTest; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.TableRef; + import org.junit.Test; @@ -81,7 +82,7 @@ public class ToExpressionTest extends BaseConnectionlessQueryTest { public ExpressionChecker checkExpressionEquality() { Implementor implementor = new PhoenixRelImplementorImpl(new RuntimeContextImpl()); - implementor.setTableRef(new TableRef(table)); + implementor.setTableMapping(new TableMapping(table)); Expression e = CalciteUtils.toExpression(this.calciteExpr, implementor); assertEquals(this.phoenixExpr,e); return this; @@ -165,7 +166,11 @@ public class ToExpressionTest extends BaseConnectionlessQueryTest { return null; PTable table = rootTables.get(name); - return new PhoenixTable(pc, table); + try { + return new PhoenixTable(pc, table); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java index a8757ab..c5c6dc4 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java @@ -44,6 +44,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.TupleProjectionCompiler; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.execute.RuntimeContext.CorrelateVariable; import org.apache.phoenix.expression.ComparisonExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.Expression; @@ -180,7 +181,7 @@ public class CorrelatePlanTest { TableRef rightTable = createProjectedTableFromLiterals(rightRelation[0]); String varName = "$cor0"; RuntimeContext runtimeContext = new RuntimeContextImpl(); - runtimeContext.defineCorrelateVariable(varName, leftTable); + runtimeContext.defineCorrelateVariable(varName, new CorrelateVariableImpl(leftTable)); QueryPlan leftPlan = newLiteralResultIterationPlan(leftRelation); QueryPlan rightPlan = newLiteralResultIterationPlan(rightRelation); Expression columnExpr = new ColumnRef(rightTable, rightCorrelColumn).newColumnExpression(); @@ -245,4 +246,28 @@ public class CorrelatePlanTest { throw new RuntimeException(e); } } + + private static class CorrelateVariableImpl implements CorrelateVariable { + private final TableRef tableRef; + private Tuple value; + + CorrelateVariableImpl(TableRef tableRef) { + this.tableRef = tableRef; + } + + @Override + public Expression newExpression(int index) { + return new ColumnRef(tableRef, index).newColumnExpression(); + } + + @Override + public Tuple getValue() { + return value; + } + + @Override + public void setValue(Tuple value) { + this.value = value; + } + } }