Repository: phoenix Updated Branches: refs/heads/calcite c65d23b4a -> 739b68cf2
Add more UPSERT SELECT tests; code refine (PHOENIX-2197 Support DML in Phoenix/Calcite integration) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/739b68cf Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/739b68cf Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/739b68cf Branch: refs/heads/calcite Commit: 739b68cf252ae995cc4e74ae867aeabeeeba9a32 Parents: c65d23b Author: maryannxue <maryann....@gmail.com> Authored: Wed May 4 13:42:12 2016 -0400 Committer: maryannxue <maryann....@gmail.com> Committed: Wed May 4 13:42:12 2016 -0400 ---------------------------------------------------------------------- .../apache/phoenix/calcite/BaseCalciteIT.java | 14 ++++---- .../apache/phoenix/calcite/CalciteDMLIT.java | 29 +++++++++++++++- .../calcite/jdbc/PhoenixCalciteFactory.java | 36 +++++++++++++++----- 3 files changed, 62 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/739b68cf/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java index b171714..cb7d01d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java @@ -161,13 +161,6 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT { return this; } - public boolean execute() throws SQLException { - final Statement statement = start.getConnection().createStatement(); - final boolean execute = statement.execute(sql); - statement.close(); - return execute; - } - public List<Object[]> getResult() throws SQLException { final Statement statement = start.getConnection().createStatement(); final ResultSet resultSet = statement.executeQuery(sql); @@ -177,6 +170,13 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT { statement.close(); return list; } + + public Sql execute() throws SQLException { + final Statement statement = start.getConnection().createStatement(); + statement.execute(sql); + statement.close(); + return this; + } public Sql executeUpdate() throws SQLException { final Statement statement = start.getConnection().createStatement(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/739b68cf/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java index 5588188..4354fd5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java @@ -26,7 +26,34 @@ public class CalciteDMLIT extends BaseCalciteIT { .executeUpdate() .close(); start(false, 1L).sql("select organization_id, entity_id from aTable") - .resultIs(new Object[][] {{"1 ", "1 "}}) + .resultIs(0, new Object[][] {{"1 ", "1 "}}) + .close(); + } + + @Test + public void testUpsertSelect() throws Exception { + startPhoenixStandalone(PROPS).sql("create table srcTable(pk0 integer not null, pk1 integer not null, c0 varchar(5), c1 varchar(5) constraint pk primary key (pk0, pk1))") + .execute() + .close(); + startPhoenixStandalone(PROPS).sql("create table tgtTable(pk0 integer not null, pk1 integer not null, f0 varchar(5), f1 varchar(5), f2 varchar(5) constraint pk primary key (pk0, pk1))") + .execute() + .close(); + start(PROPS).sql("upsert into srcTable values(1, 10, '00100', '01000')") + .executeUpdate() + .close(); + start(PROPS).sql("upsert into srcTable values(2, 20, '00200', '02000')") + .executeUpdate() + .close(); + start(PROPS).sql("upsert into srcTable values(3, 30, '00300', '03000')") + .executeUpdate() + .close(); + start(PROPS).sql("upsert into tgtTable(pk0, pk1, f0, f2) select * from srcTable where pk1 <> 20") + .executeUpdate() + .close(); + start(false, 1L).sql("select * from tgtTable") + .resultIs(0, new Object[][] { + {1, 10, "00100", null, "01000"}, + {3, 30, "00300", null, "03000"}}) .close(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/739b68cf/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java b/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java index 5f86303..d153903 100644 --- a/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java +++ b/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java @@ -26,6 +26,7 @@ import org.apache.calcite.jdbc.CalciteConnectionImpl; import org.apache.calcite.jdbc.CalciteFactory; import org.apache.calcite.jdbc.Driver; import org.apache.phoenix.calcite.PhoenixSchema; +import org.apache.phoenix.jdbc.PhoenixConnection; public class PhoenixCalciteFactory extends CalciteFactory { @@ -97,27 +98,44 @@ public class PhoenixCalciteFactory extends CalciteFactory { CalciteSchema.createRootSchema(true, false), typeFactory); } + public void setAutoCommit(final boolean isAutoCommit) throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.setAutoCommit(isAutoCommit);; + }}); + } + public void commit() throws SQLException { - for (String subSchemaName : getRootSchema().getSubSchemaNames()) { - try { - PhoenixSchema phoenixSchema = getRootSchema() - .getSubSchema(subSchemaName).unwrap(PhoenixSchema.class); - phoenixSchema.pc.commit(); - } catch (ClassCastException e) { - } - } + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.commit(); + }}); } public void close() throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.close(); + }}); + } + + private void call(PhoenixConnectionCallable callable) throws SQLException { for (String subSchemaName : getRootSchema().getSubSchemaNames()) { try { PhoenixSchema phoenixSchema = getRootSchema() .getSubSchema(subSchemaName).unwrap(PhoenixSchema.class); - phoenixSchema.pc.close(); + callable.call(phoenixSchema.pc); } catch (ClassCastException e) { } } } + + private static interface PhoenixConnectionCallable { + void call(PhoenixConnection conn) throws SQLException; + } } private static class PhoenixCalciteStatement extends CalciteStatement {