Repository: phoenix
Updated Branches:
  refs/heads/calcite 44800825b -> 85937f7e9


Add PhoenixProjectScanMergeRule


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/85937f7e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/85937f7e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/85937f7e

Branch: refs/heads/calcite
Commit: 85937f7e95d0527f500c51f95efd8980e9f30cc6
Parents: 4480082
Author: maryannxue <[email protected]>
Authored: Sat Mar 14 23:40:51 2015 -0400
Committer: maryannxue <[email protected]>
Committed: Sat Mar 14 23:40:51 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteTest.java | 169 +++++++++----------
 .../apache/phoenix/calcite/CalciteUtils.java    |   9 +-
 .../calcite/PhoenixFilterScanMergeRule.java     |   2 +-
 .../apache/phoenix/calcite/PhoenixProject.java  |  70 ++++----
 .../calcite/PhoenixProjectScanMergeRule.java    |  37 ++++
 .../org/apache/phoenix/calcite/PhoenixRel.java  |   4 +
 .../calcite/PhoenixRelImplementorImpl.java      |  36 ++++
 .../apache/phoenix/calcite/PhoenixRules.java    |  27 +++
 .../apache/phoenix/calcite/PhoenixTable.java    |   2 +-
 .../phoenix/calcite/PhoenixTableScan.java       |  50 +++---
 .../phoenix/execute/TupleProjectionPlan.java    |  12 ++
 11 files changed, 265 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index b125ac9..a236290 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -5,20 +5,17 @@ import com.google.common.collect.Lists;
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.BaseTest;
-import org.junit.Ignore;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.io.PrintWriter;
 import java.sql.*;
-import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.*;
 
 /**
@@ -74,42 +71,38 @@ public class CalciteTest extends BaseClientManagedTimeIT {
             this.sql = sql;
         }
 
-        public List<String> getResult(ResultSet resultSet) throws SQLException 
{
-            final List<String> list = Lists.newArrayList();
+        public static List<Object[]> getResult(ResultSet resultSet) throws 
SQLException {
+            final List<Object[]> list = Lists.newArrayList();
             populateResult(resultSet, list);
             return list;
         }
 
-        private void populateResult(ResultSet resultSet, List<String> list) 
throws SQLException {
-            final StringBuilder buf = new StringBuilder();
+        private static void populateResult(ResultSet resultSet, List<Object[]> 
list) throws SQLException {
             final int columnCount = resultSet.getMetaData().getColumnCount();
             while (resultSet.next()) {
+                Object[] row = new Object[columnCount];
                 for (int i = 0; i < columnCount; i++) {
-                    if (i > 0) {
-                        buf.append(", ");
-                    }
-                    buf.append(resultSet.getString(i + 1));
+                    row[i] = resultSet.getObject(i + 1);
                 }
-                list.add(buf.toString());
-                buf.setLength(0);
+                list.add(row);
             }
         }
 
         public Sql explainIs(String expected) {
-            final List<String> list = getResult("explain plan for " + sql);
+            final List<Object[]> list = getResult("explain plan for " + sql);
             if (list.size() != 1) {
                 fail("explain should return 1 row, got " + list.size());
             }
-            String explain = list.get(0);
-            assertThat(explain, equalTo(expected));
+            String explain = (String) (list.get(0)[0]);
+            assertEquals(explain, expected);
             return this;
         }
 
-        public List<String> getResult(String sql) {
+        public List<Object[]> getResult(String sql) {
             try {
                 final Statement statement = 
start.getConnection().createStatement();
                 final ResultSet resultSet = statement.executeQuery(sql);
-                List<String> list = getResult(resultSet);
+                List<Object[]> list = getResult(resultSet);
                 resultSet.close();
                 statement.close();
                 return list;
@@ -122,9 +115,24 @@ public class CalciteTest extends BaseClientManagedTimeIT {
             start.close();
         }
 
-        public Sql resultIs(String... lines) {
-            assertThat(Arrays.asList(lines), equalTo(getResult(sql)));
-            return this;
+        public Sql resultIs(Object[]... expected) {
+            try {
+                final Statement statement = 
start.getConnection().createStatement();
+                final ResultSet resultSet = statement.executeQuery(sql);
+                for (int i = 0; i < expected.length; i++) {
+                    assertTrue(resultSet.next());
+                    Object[] row = expected[i];
+                    for (int j = 0; j < row.length; j++) {
+                        assertEquals(row[j], resultSet.getObject(j + 1));
+                    }
+                }        
+                assertFalse(resultSet.next());
+                resultSet.close();
+                statement.close();
+                return this;
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -136,7 +144,6 @@ public class CalciteTest extends BaseClientManagedTimeIT {
         final String url = getUrl();
         final PhoenixConnection phoenixConnection =
             DriverManager.getConnection(url).unwrap(PhoenixConnection.class);
-        BaseTest.ensureTableCreated(url, ATABLE_NAME);
         calciteConnection.getRootSchema().add("phoenix",
             new PhoenixSchema(phoenixConnection));
         calciteConnection.setSchema("phoenix");
@@ -167,91 +174,69 @@ public class CalciteTest extends BaseClientManagedTimeIT {
         pw.close();
         final Connection connection =
             DriverManager.getConnection("jdbc:calcite:model=" + 
file.getAbsolutePath());
-        BaseTest.ensureTableCreated(url, ATABLE_NAME);
         return connection;
     }
     
-    private void testConnect(String query, Object[][] expectedValues) throws 
Exception {
-        final Connection connection = 
DriverManager.getConnection("jdbc:calcite:");
-        final CalciteConnection calciteConnection =
-            connection.unwrap(CalciteConnection.class);
-        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+    @Before
+    public void initTable() throws Exception {
         final String url = getUrl();
-        final PhoenixConnection phoenixConnection =
-            DriverManager.getConnection(url).unwrap(PhoenixConnection.class);
         ensureTableCreated(url, ATABLE_NAME);
         initATableValues(getOrganizationId(), null, url);
-        ensureTableCreated(url, JOIN_ITEM_TABLE_FULL_NAME);
-        ensureTableCreated(url, JOIN_SUPPLIER_TABLE_FULL_NAME);
         initJoinTableValues(url, null, null);
-        calciteConnection.getRootSchema().add("phoenix",
-            new PhoenixSchema(phoenixConnection));
-        calciteConnection.setSchema("phoenix");
-        final Statement statement = calciteConnection.createStatement();
-        final ResultSet resultSet = statement.executeQuery(query);
-
-        for (int i = 0; i < expectedValues.length; i++) {
-            assertTrue(resultSet.next());
-            Object[] row = expectedValues[i];
-            for (int j = 0; j < row.length; j++) {
-                assertEquals(row[j], resultSet.getObject(j + 1));
-            }
-        }        
-        assertFalse(resultSet.next());
-        
-        resultSet.close();
-        statement.close();
-        connection.close();
     }
     
     @Test public void testTableScan() throws Exception {
-        testConnect("select * from aTable where a_string = 'a'", 
-                new Object[][] {{"00D300000000XHP", "00A123122312312", "a"}, 
-                                {"00D300000000XHP", "00A223122312312", "a"}, 
-                                {"00D300000000XHP", "00A323122312312", "a"}, 
-                                {"00D300000000XHP", "00A423122312312", "a"}});
+        start().sql("select * from aTable where a_string = 'a'")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')])\n")
+                .resultIs(new Object[][] {
+                          {"00D300000000XHP", "00A123122312312", "a"}, 
+                          {"00D300000000XHP", "00A223122312312", "a"}, 
+                          {"00D300000000XHP", "00A323122312312", "a"}, 
+                          {"00D300000000XHP", "00A423122312312", "a"}})
+                .close();
     }
     
     @Test public void testProject() throws Exception {
-        testConnect("select entity_id, a_string, organization_id from aTable 
where a_string = 'a'", 
-                new Object[][] {{"00A123122312312", "a", "00D300000000XHP"}, 
-                                {"00A223122312312", "a", "00D300000000XHP"}, 
-                                {"00A323122312312", "a", "00D300000000XHP"}, 
-                                {"00A423122312312", "a", "00D300000000XHP"}});
+        start().sql("select entity_id, a_string, organization_id from aTable 
where a_string = 'a'")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')], project=[[$1, $2, $0]])\n")
+                .resultIs(new Object[][] {
+                          {"00A123122312312", "a", "00D300000000XHP"}, 
+                          {"00A223122312312", "a", "00D300000000XHP"}, 
+                          {"00A323122312312", "a", "00D300000000XHP"}, 
+                          {"00A423122312312", "a", "00D300000000XHP"}})
+                .close();
     }
     
     @Test public void testJoin() throws Exception {
-        testConnect("select t1.entity_id, t2.a_string, t1.organization_id from 
aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id 
= t2.organization_id where t1.a_string = 'a'", 
-                new Object[][] {{"00A123122312312", "a", "00D300000000XHP"}, 
-                                {"00A223122312312", "a", "00D300000000XHP"}, 
-                                {"00A323122312312", "a", "00D300000000XHP"}, 
-                                {"00A423122312312", "a", "00D300000000XHP"}});
-        testConnect("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", 
supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + 
JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = 
supp.\"supplier_id\"", 
-                new Object[][] {{"0000000001", "T1", "0000000001", "S1"}, 
-                                {"0000000002", "T2", "0000000001", "S1"}, 
-                                {"0000000003", "T3", "0000000002", "S2"}, 
-                                {"0000000004", "T4", "0000000002", "S2"},
-                                {"0000000005", "T5", "0000000005", "S5"},
-                                {"0000000006", "T6", "0000000006", "S6"}});
-    }
-
-    @Test public void testExplainPlanForSelectWhereQuery() {
-        start()
-            .sql("select * from aTable where a_string = 'a'")
-            .explainIs(
-                "PhoenixToEnumerableConverter\n"
-                    + "  PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')])\n")
-            .close();
-    }
-
-    @Test public void testExplainProject() {
-        start()
-            .sql("select a_string, b_string from aTable where a_string = 'a'")
-            .explainIs(
-                "PhoenixToEnumerableConverter\n"
-                    + "  PhoenixProject(A_STRING=[$2], B_STRING=[$3])\n"
-                    + "    PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')])\n")
-            .close();
+        start().sql("select t1.entity_id, t2.a_string, t1.organization_id from 
aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id 
= t2.organization_id where t1.a_string = 'a'") 
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixProject(ENTITY_ID=[$4], A_STRING=[$2], 
ORGANIZATION_ID=[$3])\n" +
+                           "    PhoenixJoin(condition=[AND(=($4, $1), =($3, 
$0))], joinType=[inner])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]], 
project=[[$0, $1, $2]])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')], project=[[$0, $1, $2]])\n")
+                .resultIs(new Object[][] {
+                          {"00A123122312312", "a", "00D300000000XHP"}, 
+                          {"00A223122312312", "a", "00D300000000XHP"}, 
+                          {"00A323122312312", "a", "00D300000000XHP"}, 
+                          {"00A423122312312", "a", "00D300000000XHP"}})
+                .close();
+        
+        start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", 
supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + 
JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = 
supp.\"supplier_id\"")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixProject(item_id=[$0], NAME=[$1], 
supplier_id=[$3], NAME0=[$4])\n" +
+                           "    PhoenixJoin(condition=[=($2, $3)], 
joinType=[inner])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, 
ITEMTABLE]], project=[[$0, $1, $5]])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, 
SUPPLIERTABLE]], project=[[$0, $1]])\n")
+                .resultIs(new Object[][] {
+                          {"0000000001", "T1", "0000000001", "S1"}, 
+                          {"0000000002", "T2", "0000000001", "S1"}, 
+                          {"0000000003", "T3", "0000000002", "S2"}, 
+                          {"0000000004", "T4", "0000000002", "S2"},
+                          {"0000000005", "T5", "0000000005", "S5"},
+                          {"0000000006", "T6", "0000000006", "S6"}})
+                .close();
     }
 
     @Test public void testConnectUsingModel() throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index 956f317..b6eaf37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -3,6 +3,7 @@ package org.apache.phoenix.calcite;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
@@ -24,7 +25,13 @@ import com.google.common.collect.Maps;
  * Utilities for interacting with Calcite.
  */
 public class CalciteUtils {
-  private CalciteUtils() {}
+    private CalciteUtils() {}
+    
+    private static AtomicInteger tempAliasCounter = new AtomicInteger(0);
+  
+    public static String createTempAlias() {
+        return "$" + tempAliasCounter.incrementAndGet();
+    }
 
        private static final Map<SqlKind, ExpressionFactory> EXPRESSION_MAP = 
Maps
                        
.newHashMapWithExpectedSize(ExpressionType.values().length);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java
index 808fa99..d35abad 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilterScanMergeRule.java
@@ -31,6 +31,6 @@ public class PhoenixFilterScanMergeRule extends RelOptRule {
         assert scan.filter == null : "predicate should have ensured no filter";
         call.transformTo(new PhoenixTableScan(scan.getCluster(),
                 scan.getTraitSet(), scan.getTable(),
-                filter.getCondition()));
+                filter.getCondition(), scan.projects, scan.getRowType()));
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
index a406456..2dd1c28 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
@@ -1,6 +1,7 @@
 package org.apache.phoenix.calcite;
 
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.calcite.plan.RelOptCluster;
@@ -11,14 +12,21 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
-import org.apache.phoenix.compile.ColumnProjector;
-import org.apache.phoenix.compile.ExplainPlan;
-import org.apache.phoenix.compile.ExpressionProjector;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.execute.DelegateQueryPlan;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
 
 import com.google.common.collect.Lists;
 
@@ -46,32 +54,36 @@ public class PhoenixProject extends Project implements 
PhoenixRel {
     public QueryPlan implement(Implementor implementor) {
         QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
         
-        List<RexNode> projects = getProjects();
-        List<ColumnProjector> columnProjectors = 
Lists.<ColumnProjector>newArrayList();
+        TupleProjector tupleProjector = project(implementor, getProjects());
+        PTable projectedTable = implementor.createProjectedTable();
+        implementor.setTableRef(new TableRef(projectedTable));
+        return new TupleProjectionPlan(plan, tupleProjector, null, 
implementor.createRowProjector());
+    }
+    
+    protected static TupleProjector project(Implementor implementor, 
List<RexNode> projects) {
+        KeyValueSchema.KeyValueSchemaBuilder builder = new 
KeyValueSchema.KeyValueSchemaBuilder(0);
+        Expression[] exprs = new Expression[projects.size()];
+        List<PColumn> columns = Lists.<PColumn>newArrayList();
         for (int i = 0; i < projects.size(); i++) {
             String name = projects.get(i).toString();
             Expression expr = CalciteUtils.toExpression(projects.get(i), 
implementor);
-            columnProjectors.add(new ExpressionProjector(name, "", expr, 
false));
+            builder.addField(expr);
+            exprs[i] = expr;
+            columns.add(new PColumnImpl(PNameFactory.newName(name), 
PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
+                    expr.getDataType(), expr.getMaxLength(), expr.getScale(), 
expr.isNullable(),
+                    i, expr.getSortOrder(), null, null, false, name));
         }
-        final RowProjector rowProjector = new RowProjector(columnProjectors, 
plan.getProjector().getEstimatedRowByteSize(), 
plan.getProjector().isProjectEmptyKeyValue());
-
-        return new DelegateQueryPlan(plan) {
-            
-            @Override
-            public RowProjector getProjector() {
-                return rowProjector;
-            }
-
-            @Override
-            public ResultIterator iterator() throws SQLException {
-                return delegate.iterator();
-            }
-
-            @Override
-            public ExplainPlan getExplainPlan() throws SQLException {
-                return delegate.getExplainPlan();
-            }
-            
-        };
+        try {
+            PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, 
PName.EMPTY_NAME,
+                    PTableType.SUBQUERY, null, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
+                    null, null, columns, null, null, 
Collections.<PTable>emptyList(),
+                    false, Collections.<PName>emptyList(), null, null, false, 
false, false, null,
+                    null, null);
+            implementor.setTableRef(new 
TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, 
false));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        
+        return new TupleProjector(builder.build(), exprs);        
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProjectScanMergeRule.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProjectScanMergeRule.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProjectScanMergeRule.java
new file mode 100644
index 0000000..d28159d
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProjectScanMergeRule.java
@@ -0,0 +1,37 @@
+package org.apache.phoenix.calcite;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Project;
+
+import com.google.common.base.Predicate;
+
+public class PhoenixProjectScanMergeRule extends RelOptRule {
+
+    /** Predicate that returns true if a table scan has no project. */
+    private static final Predicate<PhoenixTableScan> NO_PROJECT =
+        new Predicate<PhoenixTableScan>() {
+            @Override
+            public boolean apply(PhoenixTableScan phoenixTableScan) {
+                return phoenixTableScan.projects == null;
+            }
+        };
+
+    public static final PhoenixProjectScanMergeRule INSTANCE = new 
PhoenixProjectScanMergeRule();
+
+    private PhoenixProjectScanMergeRule() {
+        super(
+            operand(Project.class,
+                operand(PhoenixTableScan.class, null, NO_PROJECT, any())));
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        Project project = call.rel(0);
+        PhoenixTableScan scan = call.rel(1);
+        assert scan.projects == null : "predicate should have ensured no 
project";
+        call.transformTo(new PhoenixTableScan(scan.getCluster(),
+                scan.getTraitSet(), scan.getTable(),
+                scan.filter, project.getProjects(), project.getRowType()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
index 4909d64..27a7b0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
@@ -3,7 +3,9 @@ package org.apache.phoenix.calcite;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.rel.RelNode;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 
 /**
@@ -47,5 +49,7 @@ public interface PhoenixRel extends RelNode {
     void pushContext(ImplementorContext context);
     ImplementorContext popContext();
     ImplementorContext getCurrentContext();
+    PTable createProjectedTable();
+    RowProjector createRowProjector();
   }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
index 2a403ad..ef92f34 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
@@ -1,13 +1,24 @@
 package org.apache.phoenix.calcite;
 
+import java.sql.SQLException;
+import java.util.List;
 import java.util.Stack;
 
 import org.apache.phoenix.calcite.PhoenixRel.ImplementorContext;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.ExpressionProjector;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.TupleProjectionCompiler;
 import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 
+import com.google.common.collect.Lists;
+
 class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
        private TableRef tableRef;
        private Stack<ImplementorContext> contextStack;
@@ -53,5 +64,30 @@ class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
     public ImplementorContext getCurrentContext() {
         return contextStack.peek();
     }
+    
+    @Override
+    public PTable createProjectedTable() {
+        List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
+        for (PColumn column : getTableRef().getTable().getColumns()) {
+            sourceColumnRefs.add(new ColumnRef(getTableRef(), 
column.getPosition()));
+        }
+        
+        try {
+            return TupleProjectionCompiler.createProjectedTable(getTableRef(), 
sourceColumnRefs, getCurrentContext().isRetainPKColumns());
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    @Override
+    public RowProjector createRowProjector() {
+        List<ColumnProjector> columnProjectors = 
Lists.<ColumnProjector>newArrayList();
+        for (PColumn column : getTableRef().getTable().getColumns()) {
+            Expression expr = newColumnExpression(column.getPosition());
+            columnProjectors.add(new 
ExpressionProjector(column.getName().getString(), 
getTableRef().getTable().getName().getString(), expr, false));
+        }
+        // TODO get estimate row size
+        return new RowProjector(columnProjectors, 0, false);        
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
index 77a8b7b..b8551a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
@@ -6,6 +6,7 @@ import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.logical.LogicalAggregate;
@@ -32,6 +33,7 @@ public class PhoenixRules {
         PhoenixProjectRule.INSTANCE,
         PhoenixAggregateRule.INSTANCE,
         PhoenixUnionRule.INSTANCE,
+        PhoenixJoinRule.INSTANCE,
     };
 
     /** Base class for planner rules that convert a relational expression to
@@ -168,6 +170,31 @@ public class PhoenixRules {
     }
 
     /**
+     * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
+     * {@link PhoenixSort}.
+     */
+    private static class PhoenixJoinRule extends PhoenixConverterRule {
+        public static final PhoenixJoinRule INSTANCE = new PhoenixJoinRule();
+
+        private PhoenixJoinRule() {
+            super(Join.class, Convention.NONE, PhoenixRel.CONVENTION,
+                "PhoenixJoinRule");
+        }
+
+        public RelNode convert(RelNode rel) {
+            final Join join = (Join) rel;
+            final RelTraitSet traitSet =
+                join.getTraitSet().replace(out);
+            return new PhoenixJoin(rel.getCluster(), traitSet,
+                convert(join.getLeft(), traitSet),
+                convert(join.getRight(), traitSet),
+                join.getCondition(),
+                join.getJoinType(),
+                join.getVariablesStopped());
+        }
+    }
+
+    /**
      * Rule to convert an {@link 
org.apache.calcite.rel.logical.LogicalIntersect}
      * to an {@link PhoenixIntersectRel}.
      o/

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
index 730f642..8a63aad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
@@ -57,7 +57,7 @@ public class PhoenixTable extends AbstractTable implements 
TranslatableTable {
     @Override
     public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable 
relOptTable) {
         final RelOptCluster cluster = context.getCluster();
-        return new PhoenixTableScan(cluster, 
cluster.traitSetOf(PhoenixRel.CONVENTION), relOptTable, null);
+        return new PhoenixTableScan(cluster, 
cluster.traitSetOf(PhoenixRel.CONVENTION), relOptTable, null, null, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
index 4a4a729..9646541 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
@@ -13,18 +13,17 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.ColumnProjector;
-import org.apache.phoenix.compile.ExpressionProjector;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.compile.TupleProjectionCompiler;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.compile.WhereOptimizer;
 import org.apache.phoenix.execute.ScanPlan;
@@ -33,7 +32,6 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
@@ -48,10 +46,15 @@ import com.google.common.collect.Lists;
  */
 public class PhoenixTableScan extends TableScan implements PhoenixRel {
     public final RexNode filter;
+    public final List<RexNode> projects;
 
-    protected PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, 
RelOptTable table, RexNode filter) {
+    protected PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, 
RelOptTable table, RexNode filter, List<RexNode> projects, RelDataType rowType) 
{
         super(cluster, traits, table);
         this.filter = filter;
+        this.projects = projects;
+        if (rowType != null) {
+            this.rowType = rowType;
+        }
     }
 
     @Override
@@ -67,12 +70,14 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
             planner.addRule(rule);
         }
         planner.addRule(PhoenixFilterScanMergeRule.INSTANCE);
+        planner.addRule(PhoenixProjectScanMergeRule.INSTANCE);
     }
 
     @Override
     public RelWriter explainTerms(RelWriter pw) {
         return super.explainTerms(pw)
-            .itemIf("filter", filter, filter != null);
+            .itemIf("filter", filter, filter != null)
+            .itemIf("project", projects, projects != null);
     }
 
     @Override
@@ -89,7 +94,7 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
     public QueryPlan implement(Implementor implementor) {
         final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
         PTable pTable = phoenixTable.getTable();
-        TableRef tableRef = new TableRef(pTable);
+        TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), 
pTable, HConstants.LATEST_TIMESTAMP, false);
         implementor.setTableRef(tableRef);
         try {
             PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc);
@@ -102,11 +107,16 @@ public class PhoenixTableScan extends TableScan 
implements PhoenixRel {
                 WhereCompiler.setScanFilter(context, select, filterExpr, true, 
false);
             }
             projectAllColumnFamilies(context.getScan(), 
phoenixTable.getTable());
-            TupleProjector tupleProjector = createTupleProjector(implementor, 
phoenixTable.getTable());
+            TupleProjector tupleProjector;
+            if (projects == null) {
+                tupleProjector = createTupleProjector(implementor, 
phoenixTable.getTable());
+            } else {
+                tupleProjector = PhoenixProject.project(implementor, 
this.projects);
+            }
             TupleProjector.serializeProjectorIntoScan(context.getScan(), 
tupleProjector);
-            PTable projectedTable = createProjectedTable(tableRef, 
implementor.getCurrentContext().isRetainPKColumns());
+            PTable projectedTable = implementor.createProjectedTable();
             implementor.setTableRef(new TableRef(projectedTable));
-            RowProjector rowProjector = createRowProjector(implementor, 
pTable);
+            RowProjector rowProjector = implementor.createRowProjector();
             Integer limit = null;
             OrderBy orderBy = OrderBy.EMPTY_ORDER_BY;
             ParallelIteratorFactory iteratorFactory = null;
@@ -130,25 +140,7 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
         return new TupleProjector(builder.build(), exprs.toArray(new 
Expression[exprs.size()]));
     }
     
-    private PTable createProjectedTable(TableRef tableRef, boolean 
retainPKColumns) throws SQLException {
-        List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
-        for (PColumn column : tableRef.getTable().getColumns()) {
-            sourceColumnRefs.add(new ColumnRef(tableRef, 
column.getPosition()));
-        }
-        
-        return TupleProjectionCompiler.createProjectedTable(tableRef, 
sourceColumnRefs, retainPKColumns);
-    }
-    
-    private RowProjector createRowProjector(Implementor implementor, PTable 
table) {
-        List<ColumnProjector> columnProjectors = 
Lists.<ColumnProjector>newArrayList();
-        for (PColumn column : table.getColumns()) {
-            Expression expr = 
implementor.newColumnExpression(column.getPosition());
-            columnProjectors.add(new 
ExpressionProjector(column.getName().getString(), table.getName().getString(), 
expr, false));
-        }
-        // TODO get estimate row size
-        return new RowProjector(columnProjectors, 0, false);        
-    }
-    
+    // TODO only project needed columns
     private void projectAllColumnFamilies(Scan scan, PTable table) {
         scan.getFamilyMap().clear();
         for (PColumnFamily family : table.getColumnFamilies()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/85937f7e/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index c9cbd15..b2eba2c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.FilterResultIterator;
@@ -33,12 +34,23 @@ import com.google.common.collect.Lists;
 public class TupleProjectionPlan extends DelegateQueryPlan {
     private final TupleProjector tupleProjector;
     private final Expression postFilter;
+    private final RowProjector rowProjector;
 
     public TupleProjectionPlan(QueryPlan plan, TupleProjector tupleProjector, 
Expression postFilter) {
+        this(plan, tupleProjector, postFilter, plan.getProjector());
+    }
+    
+    public TupleProjectionPlan(QueryPlan plan, TupleProjector tupleProjector, 
Expression postFilter, RowProjector rowProjector) {
         super(plan);
         if (tupleProjector == null) throw new 
IllegalArgumentException("tupleProjector is null");
         this.tupleProjector = tupleProjector;
         this.postFilter = postFilter;
+        this.rowProjector = rowProjector;
+    }
+    
+    @Override
+    public RowProjector getProjector() {
+        return this.rowProjector;
     }
 
     @Override

Reply via email to