http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java index c9168f1..69c9869 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableIT.java @@ -37,104 +37,18 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.junit.Test; - /** * Tests for table with transparent salting. */ -public class SaltedTableIT extends ParallelStatsDisabledIT { - - private static String getUniqueTableName() { - return SchemaUtil.getTableName(generateUniqueName(), generateUniqueName()); - } - - private static String initTableValues(byte[][] splits) throws Exception { - String tableName = getUniqueTableName(); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - - // Rows we inserted: - // 1ab123abc111 - // 1abc456abc111 - // 1de123abc111 - // 2abc123def222 - // 3abc123ghi333 - // 4abc123jkl444 - try { - // Upsert with no column specifies. - ensureTableCreated(getUrl(), tableName, TABLE_WITH_SALTING, splits, null, null); - String query = "UPSERT INTO " + tableName + " VALUES(?,?,?,?,?)"; - PreparedStatement stmt = conn.prepareStatement(query); - stmt.setInt(1, 1); - stmt.setString(2, "ab"); - stmt.setString(3, "123"); - stmt.setString(4, "abc"); - stmt.setInt(5, 111); - stmt.execute(); - conn.commit(); - - stmt.setInt(1, 1); - stmt.setString(2, "abc"); - stmt.setString(3, "456"); - stmt.setString(4, "abc"); - stmt.setInt(5, 111); - stmt.execute(); - conn.commit(); - - // Test upsert when statement explicitly specifies the columns to upsert into. - query = "UPSERT INTO " + tableName + - " (a_integer, a_string, a_id, b_string, b_integer) " + - " VALUES(?,?,?,?,?)"; - stmt = conn.prepareStatement(query); - - stmt.setInt(1, 1); - stmt.setString(2, "de"); - stmt.setString(3, "123"); - stmt.setString(4, "abc"); - stmt.setInt(5, 111); - stmt.execute(); - conn.commit(); - - stmt.setInt(1, 2); - stmt.setString(2, "abc"); - stmt.setString(3, "123"); - stmt.setString(4, "def"); - stmt.setInt(5, 222); - stmt.execute(); - conn.commit(); - - // Test upsert when order of column is shuffled. - query = "UPSERT INTO " + tableName + - " (a_string, a_integer, a_id, b_string, b_integer) " + - " VALUES(?,?,?,?,?)"; - stmt = conn.prepareStatement(query); - stmt.setString(1, "abc"); - stmt.setInt(2, 3); - stmt.setString(3, "123"); - stmt.setString(4, "ghi"); - stmt.setInt(5, 333); - stmt.execute(); - conn.commit(); - - stmt.setString(1, "abc"); - stmt.setInt(2, 4); - stmt.setString(3, "123"); - stmt.setString(4, "jkl"); - stmt.setInt(5, 444); - stmt.execute(); - conn.commit(); - } finally { - conn.close(); - } - return tableName; - } +public class SaltedTableIT extends BaseSaltedTableIT { @Test public void testTableWithInvalidBucketNumber() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); try { - String query = "create table " + getUniqueTableName() + " (a_integer integer not null CONSTRAINT pk PRIMARY KEY (a_integer)) SALT_BUCKETS = 257"; + String query = "create table " + generateUniqueName() + " (a_integer integer not null CONSTRAINT pk PRIMARY KEY (a_integer)) SALT_BUCKETS = 257"; PreparedStatement stmt = conn.prepareStatement(query); stmt.execute(); fail("Should have caught exception"); @@ -148,370 +62,12 @@ public class SaltedTableIT extends ParallelStatsDisabledIT { @Test public void testTableWithSplit() throws Exception { try { - createTestTable(getUrl(), "create table " + getUniqueTableName() + " (a_integer integer not null primary key) SALT_BUCKETS = 4", + createTestTable(getUrl(), "create table " + generateUniqueName() + " (a_integer integer not null primary key) SALT_BUCKETS = 4", new byte[][] {{1}, {2,3}, {2,5}, {3}}, null); fail("Should have caught exception"); } catch (SQLException e) { assertTrue(e.getMessage(), e.getMessage().contains("ERROR 1022 (42Y81): Should not specify split points on salted table with default row key order.")); } } - - @Test - public void testSelectValueNoWhereClause() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - String tableName = initTableValues(null); - - String query = "SELECT * FROM " + tableName; - PreparedStatement statement = conn.prepareStatement(query); - ResultSet rs = statement.executeQuery(); - - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("ab", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("456", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("de", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("def", rs.getString(4)); - assertEquals(222, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(3, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("ghi", rs.getString(4)); - assertEquals(333, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(4, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("jkl", rs.getString(4)); - assertEquals(444, rs.getInt(5)); - - assertFalse(rs.next()); - } finally { - conn.close(); - } - } - - @Test - public void testSelectValueWithFullyQualifiedWhereClause() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - String tableName = initTableValues(null); - String query; - PreparedStatement stmt; - ResultSet rs; - - // Variable length slot with bounded ranges. - query = "SELECT * FROM " + tableName + - " WHERE a_integer = 1 AND a_string >= 'ab' AND a_string < 'de' AND a_id = '123'"; - stmt = conn.prepareStatement(query); - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("ab", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - assertFalse(rs.next()); - - // all single slots with one value. - query = "SELECT * FROM " + tableName + - " WHERE a_integer = 1 AND a_string = 'ab' AND a_id = '123'"; - stmt = conn.prepareStatement(query); - - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("ab", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - assertFalse(rs.next()); - - // all single slots with multiple values. - query = "SELECT * FROM " + tableName + - " WHERE a_integer in (2, 4) AND a_string = 'abc' AND a_id = '123'"; - stmt = conn.prepareStatement(query); - rs = stmt.executeQuery(); - - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("def", rs.getString(4)); - assertEquals(222, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(4, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("jkl", rs.getString(4)); - assertEquals(444, rs.getInt(5)); - assertFalse(rs.next()); - - query = "SELECT a_integer, a_string FROM " + tableName + - " WHERE a_integer in (1,2,3,4) AND a_string in ('a', 'abc', 'de') AND a_id = '123'"; - stmt = conn.prepareStatement(query); - rs = stmt.executeQuery(); - - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("de", rs.getString(2)); - - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - - assertTrue(rs.next()); - assertEquals(3, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - - assertTrue(rs.next()); - assertEquals(4, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertFalse(rs.next()); - - // fixed length slot with bounded ranges. - query = "SELECT a_string, a_id FROM " + tableName + - " WHERE a_integer > 1 AND a_integer < 4 AND a_string = 'abc' AND a_id = '123'"; - stmt = conn.prepareStatement(query); - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals("abc", rs.getString(1)); - assertEquals("123", rs.getString(2)); - - assertTrue(rs.next()); - assertEquals("abc", rs.getString(1)); - assertEquals("123", rs.getString(2)); - assertFalse(rs.next()); - - // fixed length slot with unbound ranges. - query = "SELECT b_string, b_integer FROM " + tableName + - " WHERE a_integer > 1 AND a_string = 'abc' AND a_id = '123'"; - stmt = conn.prepareStatement(query); - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals("def", rs.getString(1)); - assertEquals(222, rs.getInt(2)); - - assertTrue(rs.next()); - assertEquals("ghi", rs.getString(1)); - assertEquals(333, rs.getInt(2)); - - assertTrue(rs.next()); - assertEquals("jkl", rs.getString(1)); - assertEquals(444, rs.getInt(2)); - assertFalse(rs.next()); - - // Variable length slot with unbounded ranges. - query = "SELECT * FROM " + tableName + - " WHERE a_integer = 1 AND a_string > 'ab' AND a_id = '123'"; - stmt = conn.prepareStatement(query); - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("de", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - assertFalse(rs.next()); - - } finally { - conn.close(); - } - } - - @Test - public void testSelectValueWithNotFullyQualifiedWhereClause() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - String tableName = initTableValues(null); - - // Where without fully qualified key, point query. - String query = "SELECT * FROM " + tableName + " WHERE a_integer = ? AND a_string = ?"; - PreparedStatement stmt = conn.prepareStatement(query); - - stmt.setInt(1, 1); - stmt.setString(2, "abc"); - ResultSet rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("456", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - assertFalse(rs.next()); - - // Where without fully qualified key, range query. - query = "SELECT * FROM " + tableName + " WHERE a_integer >= 2"; - stmt = conn.prepareStatement(query); - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("def", rs.getString(4)); - assertEquals(222, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(3, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("ghi", rs.getString(4)); - assertEquals(333, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(4, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("jkl", rs.getString(4)); - assertEquals(444, rs.getInt(5)); - assertFalse(rs.next()); - - // With point query. - query = "SELECT a_string FROM " + tableName + " WHERE a_string = ?"; - stmt = conn.prepareStatement(query); - stmt.setString(1, "de"); - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals("de", rs.getString(1)); - assertFalse(rs.next()); - - query = "SELECT a_id FROM " + tableName + " WHERE a_id = ?"; - stmt = conn.prepareStatement(query); - stmt.setString(1, "456"); - rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals("456", rs.getString(1)); - assertFalse(rs.next()); - } finally { - conn.close(); - } - } - - @Test - public void testSelectWithGroupBy() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - String tableName = initTableValues(null); - - String query = "SELECT a_integer FROM " + tableName + " GROUP BY a_integer"; - PreparedStatement stmt = conn.prepareStatement(query); - ResultSet rs = stmt.executeQuery(); - int count = 0; - while (rs.next()) { - count++; - } - assertEquals("Group by does not return the right count.", count, 4); - } finally { - conn.close(); - } - } - @Test - public void testLimitScan() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - String tableName = initTableValues(null); - - String query = "SELECT a_integer FROM " + tableName + " WHERE a_string='abc' LIMIT 1"; - PreparedStatement stmt = conn.prepareStatement(query); - ResultSet rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertFalse(rs.next()); - } finally { - conn.close(); - } - } - - @Test - public void testSelectWithOrderByRowKey() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - String tableName = initTableValues(null); - - String query = "SELECT * FROM " + tableName + " ORDER BY a_integer, a_string, a_id"; - PreparedStatement statement = conn.prepareStatement(query); - ResultSet explainPlan = statement.executeQuery("EXPLAIN " + query); - // Confirm that ORDER BY in row key order will be optimized out for salted table - assertEquals("CLIENT PARALLEL 4-WAY FULL SCAN OVER " + tableName + "\n" + - "CLIENT MERGE SORT", QueryUtil.getExplainPlan(explainPlan)); - ResultSet rs = statement.executeQuery(); - - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("ab", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("456", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertEquals("de", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("abc", rs.getString(4)); - assertEquals(111, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(2, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("def", rs.getString(4)); - assertEquals(222, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(3, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("ghi", rs.getString(4)); - assertEquals(333, rs.getInt(5)); - - assertTrue(rs.next()); - assertEquals(4, rs.getInt(1)); - assertEquals("abc", rs.getString(2)); - assertEquals("123", rs.getString(3)); - assertEquals("jkl", rs.getString(4)); - assertEquals(444, rs.getInt(5)); - - assertFalse(rs.next()); - } finally { - conn.close(); - } - } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java new file mode 100644 index 0000000..afce0dd --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryBuilder.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.phoenix.parse.HintNode; + +import java.util.Collections; +import java.util.List; + +import static org.apache.phoenix.util.SchemaUtil.getEscapedFullColumnName; + +public class QueryBuilder { + + private String fullTableName; + // regular columns that are in the select clause + private List<String> selectColumns = Collections.emptyList(); + + // columns that are required for expressions in the select clause + private List<String> selectExpressionColumns = Collections.emptyList(); + // expression string in the select clause (for eg COL1 || COL2) + private String selectExpression; + private String whereClause; + private String orderByClause; + private String groupByClause; + private String havingClause; + private HintNode.Hint hint; + private boolean escapeCols; + private boolean distinct; + private int limit; + + public String getFullTableName() { + return fullTableName; + } + + /** + * @return column names required to evaluate this select statement + */ + public List<String> getRequiredColumns() { + List<String> allColumns = Lists.newArrayList(selectColumns); + if (!CollectionUtils.isEmpty(selectExpressionColumns)) { + allColumns.addAll(selectExpressionColumns); + } + return allColumns; + } + + public String getWhereClause() { + return whereClause; + } + + public HintNode.Hint getHint() { + return hint; + } + + public String getOrderByClause() { + return orderByClause; + } + + public String getGroupByClause() { + return groupByClause; + } + + public QueryBuilder setOrderByClause(String orderByClause) { + this.orderByClause = orderByClause; + return this; + } + + public QueryBuilder setFullTableName(String fullTableName) { + this.fullTableName = fullTableName; + return this; + } + + public QueryBuilder setSelectColumns(List<String> columns) { + this.selectColumns = columns; + return this; + } + + public QueryBuilder setWhereClause(String whereClause) { + this.whereClause = whereClause; + return this; + } + + public QueryBuilder setHint(HintNode.Hint hint) { + this.hint = hint; + return this; + } + + public QueryBuilder setEscapeCols(boolean escapeCols) { + this.escapeCols = escapeCols; + return this; + } + + public QueryBuilder setGroupByClause(String groupByClause) { + this.groupByClause = groupByClause; + return this; + } + + public QueryBuilder setHavingClause(String havingClause) { + this.havingClause = havingClause; + return this; + } + + public List<String> getSelectExpressionColumns() { + return selectExpressionColumns; + } + + public QueryBuilder setSelectExpressionColumns(List<String> selectExpressionColumns) { + this.selectExpressionColumns = selectExpressionColumns; + return this; + } + + public String getSelectExpression() { + return selectExpression; + } + + public QueryBuilder setSelectExpression(String selectExpression) { + this.selectExpression = selectExpression; + return this; + } + + public QueryBuilder setDistinct(boolean distinct) { + this.distinct = distinct; + return this; + } + + public QueryBuilder setLimit(int limit) { + this.limit = limit; + return this; + } + + public String build() { + Preconditions.checkNotNull(fullTableName, "Table name cannot be null"); + if (CollectionUtils.isEmpty(selectColumns) && StringUtils.isBlank(selectExpression)) { + throw new IllegalArgumentException("At least one column or select expression must be provided"); + } + StringBuilder query = new StringBuilder(); + query.append("SELECT "); + + if (distinct) { + query.append(" DISTINCT "); + } + + if (hint != null) { + final HintNode node = new HintNode(hint.name()); + String hintStr = node.toString(); + query.append(hintStr); + } + + StringBuilder selectClauseBuilder = new StringBuilder(); + if (StringUtils.isNotBlank(selectExpression)) { + if (selectClauseBuilder.length()!=0) { + selectClauseBuilder.append(" , "); + } + selectClauseBuilder.append(selectExpression); + } + + boolean first = true; + for (String col : selectColumns) { + if (StringUtils.isNotBlank(col)) { + if ((first && selectClauseBuilder.length()!=0) || !first) { + selectClauseBuilder.append(" , "); + } + String fullColumnName = col; + if (escapeCols) { + fullColumnName = getEscapedFullColumnName(col); + } + selectClauseBuilder.append(fullColumnName); + first = false; + } + } + + query.append(selectClauseBuilder); + query.append(" FROM "); + query.append(fullTableName); + if (StringUtils.isNotBlank(whereClause)) { + query.append(" WHERE (").append(whereClause).append(")"); + } + if (StringUtils.isNotBlank(groupByClause)) { + query.append(" GROUP BY ").append(groupByClause); + } + if (StringUtils.isNotBlank(havingClause)) { + query.append(" HAVING ").append(havingClause); + } + if (StringUtils.isNotBlank(orderByClause)) { + query.append(" ORDER BY ").append(orderByClause); + } + if (limit > 0) { + query.append(" LIMIT ").append(limit); + } + return query.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java index 94cbfea..4501158 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java @@ -217,45 +217,15 @@ public final class QueryUtil { * * @param fullTableName name of the table for which the select statement needs to be created. * @param columns list of columns to be projected in the select statement. - * @param conditions The condition clause to be added to the WHERE condition + * @param whereClause The condition clause to be added to the WHERE condition * @param hint hint to use * @param escapeCols whether to escape the projected columns * @return Select Query */ public static String constructSelectStatement(String fullTableName, List<String> columns, - final String conditions, Hint hint, boolean escapeCols) { - Preconditions.checkNotNull(fullTableName, "Table name cannot be null"); - if (columns == null || columns.isEmpty()) { - throw new IllegalArgumentException("At least one column must be provided"); - } - StringBuilder query = new StringBuilder(); - query.append("SELECT "); - - String hintStr = ""; - if (hint != null) { - final HintNode node = new HintNode(hint.name()); - hintStr = node.toString(); - } - query.append(hintStr); - - for (String col : columns) { - if (col != null) { - String fullColumnName = col; - if (escapeCols) { - fullColumnName = getEscapedFullColumnName(col); - } - query.append(fullColumnName); - query.append(","); - } - } - // Remove the trailing comma - query.setLength(query.length() - 1); - query.append(" FROM "); - query.append(fullTableName); - if (conditions != null && conditions.length() > 0) { - query.append(" WHERE (").append(conditions).append(")"); - } - return query.toString(); + final String whereClause, Hint hint, boolean escapeCols) { + return new QueryBuilder().setFullTableName(fullTableName).setSelectColumns(columns) + .setWhereClause(whereClause).setHint(hint).setEscapeCols(escapeCols).build(); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java index c6bb739..a904bca 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java @@ -47,7 +47,7 @@ public class IndexScrutinyTableOutputTest extends BaseIndexTest { String sqlStr = IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, columnNames, SCRUTINY_TIME_MILLIS); - assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193))", + assertEquals("SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , \"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , \"ID\" , \"PK_PART2\" , \"NAME\" , \"ZIP\" , \":ID\" , \":PK_PART2\" , \"0:NAME\" , \"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193))", sqlStr); } @@ -58,7 +58,7 @@ public class IndexScrutinyTableOutputTest extends BaseIndexTest { String query = IndexScrutinyTableOutput.getSqlQueryMissingTargetRows(conn, columnNames, SCRUTINY_TIME_MILLIS); - assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,false))", + assertEquals("SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , \"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , \"ID\" , \"PK_PART2\" , \"NAME\" , \"ZIP\" , \":ID\" , \":PK_PART2\" , \"0:NAME\" , \"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,false))", query); } @@ -69,7 +69,7 @@ public class IndexScrutinyTableOutputTest extends BaseIndexTest { String query = IndexScrutinyTableOutput.getSqlQueryBadCoveredColVal(conn, columnNames, SCRUTINY_TIME_MILLIS); - assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,true))", + assertEquals("SELECT \"SOURCE_TABLE\" , \"TARGET_TABLE\" , \"SCRUTINY_EXECUTE_TIME\" , \"SOURCE_ROW_PK_HASH\" , \"SOURCE_TS\" , \"TARGET_TS\" , \"HAS_TARGET_ROW\" , \"ID\" , \"PK_PART2\" , \"NAME\" , \"ZIP\" , \":ID\" , \":PK_PART2\" , \"0:NAME\" , \"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,true))", query); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java index f864dd5..0c4c004 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java @@ -145,7 +145,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl()); PhoenixConfigurationUtil.setInputTableName(configuration, tableName); final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); - final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + tableName ; + final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + tableName ; assertEquals(expectedSelectStatement, selectStatement); } finally { conn.close(); @@ -167,7 +167,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl()); PhoenixConfigurationUtil.setInputTableName(configuration, fullTableName); final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); - final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + fullTableName; + final String expectedSelectStatement = "SELECT \"A_STRING\" , \"A_BINARY\" , \"0\".\"COL1\" FROM " + fullTableName; assertEquals(expectedSelectStatement, selectStatement); } finally { conn.close(); @@ -209,7 +209,7 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY); PhoenixConfigurationUtil.setInputTableName(configuration, tableName); final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); - final String expectedSelectStatement = "SELECT \"ID\",\"0\".\"VCARRAY\" FROM " + tableName ; + final String expectedSelectStatement = "SELECT \"ID\" , \"0\".\"VCARRAY\" FROM " + tableName ; assertEquals(expectedSelectStatement, selectStatement); } finally { conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java index 2d094f6..8ee8f97 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java @@ -66,14 +66,14 @@ public class QueryUtilTest { @Test public void testConstructSelectStatement() { assertEquals( - "SELECT \"ID\",\"NAME\" FROM MYTAB", + "SELECT \"ID\" , \"NAME\" FROM MYTAB", QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null)); } @Test public void testConstructSelectStatementWithSchema() { assertEquals( - "SELECT \"ID\",\"NAME\" FROM A.MYTAB", + "SELECT \"ID\" , \"NAME\" FROM A.MYTAB", QueryUtil.constructSelectStatement("A.MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null)); } @@ -83,7 +83,7 @@ public class QueryUtilTest { final String schemaName = SchemaUtil.getEscapedArgument("a"); final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); assertEquals( - "SELECT \"ID\",\"NAME\" FROM \"a\".MYTAB", + "SELECT \"ID\" , \"NAME\" FROM \"a\".MYTAB", QueryUtil.constructSelectStatement(fullTableName, ImmutableList.of(ID_COLUMN,NAME_COLUMN),null)); } @@ -93,14 +93,14 @@ public class QueryUtilTest { final String schemaName = SchemaUtil.getEscapedArgument("a"); final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); assertEquals( - "SELECT \"ID\",\"NAME\" FROM \"a\".\"mytab\"", + "SELECT \"ID\" , \"NAME\" FROM \"a\".\"mytab\"", QueryUtil.constructSelectStatement(fullTableName, ImmutableList.of(ID_COLUMN,NAME_COLUMN),null)); } @Test public void testConstructSelectWithHint() { assertEquals( - "SELECT /*+ NO_INDEX */ \"col1\",\"col2\" FROM MYTAB WHERE (\"col2\"=? and \"col3\" is null)", + "SELECT /*+ NO_INDEX */ \"col1\" , \"col2\" FROM MYTAB WHERE (\"col2\"=? and \"col3\" is null)", QueryUtil.constructSelectStatement("MYTAB", Lists.newArrayList("col1", "col2"), "\"col2\"=? and \"col3\" is null", Hint.NO_INDEX, true)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-spark/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml index 6b9d58b..858895a 100644 --- a/phoenix-spark/pom.xml +++ b/phoenix-spark/pom.xml @@ -488,6 +488,14 @@ <testResources><testResource><directory>src/it/resources</directory></testResource></testResources> <plugins> <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> </plugin> http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java new file mode 100644 index 0000000..e4b96a3 --- /dev/null +++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/AggregateIT.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.phoenix.end2end.BaseAggregateIT; +import org.apache.phoenix.util.QueryBuilder; + +public class AggregateIT extends BaseAggregateIT { + + @Override + protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder, + String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) { + ResultSet rs = null; + try { + rs = executeQuery(conn, queryBuilder); + fail(); + } + catch(Exception e) { + assertTrue(e.getMessage().contains(expectedSparkExceptionMsg)); + } + return rs; + } + + @Override + protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException { + return SparkUtil.executeQuery(conn, queryBuilder, getUrl(), config); + } + + @Override + protected void testCountNullInNonEmptyKeyValueCF(int columnEncodedBytes) throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + //Type is INT + String intTableName=generateUniqueName(); + String sql="create table " + intTableName + " (mykey integer not null primary key, A.COLA integer, B.COLB integer) " + + "IMMUTABLE_ROWS=true, IMMUTABLE_STORAGE_SCHEME = ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES = " + columnEncodedBytes + ", DISABLE_WAL=true"; + + conn.createStatement().execute(sql); + conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (1,1)"); + conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (2,1)"); + conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (3,1,2)"); + conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (4,1)"); + conn.createStatement().execute("UPSERT INTO "+intTableName+" VALUES (5,1)"); + conn.commit(); + + sql="select count(*) from "+intTableName; + QueryBuilder queryBuilder = new QueryBuilder() + .setSelectExpression("COUNT(*)") + .setFullTableName(intTableName); + ResultSet rs = executeQuery(conn, queryBuilder); + assertTrue(rs.next()); + assertEquals(5, rs.getLong(1)); + + sql="select count(*) from "+intTableName + " where b.colb is not null"; + queryBuilder.setWhereClause("`B.COLB` IS NOT NULL"); + rs = executeQuery(conn, queryBuilder); + assertTrue(rs.next()); + assertEquals(1, rs.getLong(1)); + + sql="select count(*) from "+intTableName + " where b.colb is null"; + queryBuilder.setWhereClause("`B.COLB` IS NULL"); + rs = executeQuery(conn, queryBuilder); + assertTrue(rs.next()); + assertEquals(4, rs.getLong(1)); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java new file mode 100644 index 0000000..bdffaf5 --- /dev/null +++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/OrderByIT.java @@ -0,0 +1,460 @@ +package org.apache.phoenix.spark; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; + +import org.apache.phoenix.end2end.BaseOrderByIT; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryBuilder; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import scala.Option; +import scala.collection.JavaConverters; + +public class OrderByIT extends BaseOrderByIT { + + @Override + protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder, + String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) { + ResultSet rs = null; + try { + rs = executeQuery(conn, queryBuilder); + fail(); + } + catch(Exception e) { + assertTrue(e.getMessage().contains(expectedSparkExceptionMsg)); + } + return rs; + } + + @Override + protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException { + return SparkUtil.executeQuery(conn, queryBuilder, getUrl(), config); + } + + @Test + public void testOrderByWithJoin() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + String tableName1 = generateUniqueName(); + String ddl = "CREATE TABLE " + tableName1 + + " (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer " + + " CONSTRAINT pk PRIMARY KEY (a_string))\n"; + createTestTable(getUrl(), ddl); + String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?,?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + stmt.setString(1, "a"); + stmt.setInt(2, 40); + stmt.setString(3, "aa"); + stmt.setInt(4, 10); + stmt.setString(5, "bb"); + stmt.setInt(6, 20); + stmt.execute(); + stmt.setString(1, "c"); + stmt.setInt(2, 30); + stmt.setString(3, "cc"); + stmt.setInt(4, 50); + stmt.setString(5, "dd"); + stmt.setInt(6, 60); + stmt.execute(); + stmt.setString(1, "b"); + stmt.setInt(2, 40); + stmt.setString(3, "bb"); + stmt.setInt(4, 5); + stmt.setString(5, "aa"); + stmt.setInt(6, 80); + stmt.execute(); + conn.commit(); + + String tableName2 = generateUniqueName(); + ddl = "CREATE TABLE " + tableName2 + + " (a_string varchar not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (a_string))\n"; + createTestTable(getUrl(), ddl); + + dml = "UPSERT INTO " + tableName2 + " VALUES(?, ?)"; + stmt = conn.prepareStatement(dml); + stmt.setString(1, "a"); + stmt.setInt(2, 40); + stmt.execute(); + stmt.setString(1, "b"); + stmt.setInt(2, 20); + stmt.execute(); + stmt.setString(1, "c"); + stmt.setInt(2, 30); + stmt.execute(); + conn.commit(); + + // create two PhoenixRDDs using the table names and columns that are required for the JOIN query + List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D"); + SQLContext sqlContext = SparkUtil.getSqlContext(); + Dataset phoenixDataSet = + new PhoenixRDD(SparkUtil.getSparkContext(), tableName1, + JavaConverters.collectionAsScalaIterableConverter(table1Columns) + .asScala().toSeq(), + Option.apply((String) null), Option.apply(getUrl()), config, false, + null).toDataFrame(sqlContext); + phoenixDataSet.registerTempTable(tableName1); + List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1"); + phoenixDataSet = + new PhoenixRDD(SparkUtil.getSparkContext(), tableName2, + JavaConverters.collectionAsScalaIterableConverter(table2Columns) + .asScala().toSeq(), + Option.apply((String) null), Option.apply(getUrl()), config, false, + null).toDataFrame(sqlContext); + phoenixDataSet.registerTempTable(tableName2); + + String query = + "SELECT T1.* FROM " + tableName1 + " T1 JOIN " + tableName2 + + " T2 ON T1.A_STRING = T2.A_STRING ORDER BY T1.`CF1.B`"; + Dataset<Row> dataset = + sqlContext.sql(query); + List<Row> rows = dataset.collectAsList(); + ResultSet rs = new SparkResultSet(rows, dataset.columns()); + + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(40,rs.getInt(2)); + assertEquals("aa",rs.getString(3)); + assertEquals(10,rs.getInt(4)); + assertEquals("bb",rs.getString(5)); + assertEquals(20,rs.getInt(6)); + assertTrue(rs.next()); + assertEquals("b",rs.getString(1)); + assertEquals(40,rs.getInt(2)); + assertEquals("bb",rs.getString(3)); + assertEquals(5,rs.getInt(4)); + assertEquals("aa",rs.getString(5)); + assertEquals(80,rs.getInt(6)); + assertTrue(rs.next()); + assertEquals("c",rs.getString(1)); + assertEquals(30,rs.getInt(2)); + assertEquals("cc",rs.getString(3)); + assertEquals(50,rs.getInt(4)); + assertEquals("dd",rs.getString(5)); + assertEquals(60,rs.getInt(6)); + assertFalse(rs.next()); + + query = + "select t1.a_string, t2.col1 from " + tableName1 + " t1 join " + tableName2 + + " t2 on t1.a_string = t2.a_string order by t2.col1"; + dataset = sqlContext.sql(query); + rows = dataset.collectAsList(); + rs = new SparkResultSet(rows, dataset.columns()); + assertTrue(rs.next()); + assertEquals("b",rs.getString(1)); + assertEquals(20,rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("c",rs.getString(1)); + assertEquals(30,rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(40,rs.getInt(2)); + assertFalse(rs.next()); + } + } + + @Test + public void testOrderByWithUnionAll() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)){ + conn.setAutoCommit(false); + String tableName1 = generateUniqueName(); + String ddl = "CREATE TABLE " + tableName1 + + " (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer " + + " CONSTRAINT pk PRIMARY KEY (a_string))\n"; + createTestTable(getUrl(), ddl); + String dml = "UPSERT INTO " + tableName1 + " VALUES(?,?,?,?,?,?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + stmt.setString(1, "a"); + stmt.setInt(2, 40); + stmt.setString(3, "aa"); + stmt.setInt(4, 10); + stmt.setString(5, "bb"); + stmt.setInt(6, 20); + stmt.execute(); + stmt.setString(1, "c"); + stmt.setInt(2, 30); + stmt.setString(3, "cc"); + stmt.setInt(4, 50); + stmt.setString(5, "dd"); + stmt.setInt(6, 60); + stmt.execute(); + stmt.setString(1, "b"); + stmt.setInt(2, 40); + stmt.setString(3, "bb"); + stmt.setInt(4, 5); + stmt.setString(5, "aa"); + stmt.setInt(6, 80); + stmt.execute(); + conn.commit(); + + String tableName2 = generateUniqueName(); + ddl = "CREATE TABLE " + tableName2 + + " (a_string varchar not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (a_string))\n"; + createTestTable(getUrl(), ddl); + + dml = "UPSERT INTO " + tableName2 + " VALUES(?, ?)"; + stmt = conn.prepareStatement(dml); + stmt.setString(1, "aa"); + stmt.setInt(2, 40); + stmt.execute(); + stmt.setString(1, "bb"); + stmt.setInt(2, 10); + stmt.execute(); + stmt.setString(1, "cc"); + stmt.setInt(2, 30); + stmt.execute(); + conn.commit(); + + + List<String> table1Columns = Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D"); + SQLContext sqlContext = SparkUtil.getSqlContext(); + Dataset phoenixDataSet = + new PhoenixRDD(SparkUtil.getSparkContext(), tableName1, + JavaConverters.collectionAsScalaIterableConverter(table1Columns) + .asScala().toSeq(), + Option.apply((String) null), Option.apply(getUrl()), config, false, + null).toDataFrame(sqlContext); + phoenixDataSet.registerTempTable(tableName1); + List<String> table2Columns = Lists.newArrayList("A_STRING", "COL1"); + phoenixDataSet = + new PhoenixRDD(SparkUtil.getSparkContext(), tableName2, + JavaConverters.collectionAsScalaIterableConverter(table2Columns) + .asScala().toSeq(), + Option.apply((String) null), Option.apply(getUrl()), config, false, + null).toDataFrame(sqlContext); + phoenixDataSet.registerTempTable(tableName2); + + String query = + "select a_string, `cf2.d` from " + tableName1 + " union all select * from " + + tableName2 + " order by `cf2.d`"; + Dataset<Row> dataset = + sqlContext.sql(query); + List<Row> rows = dataset.collectAsList(); + ResultSet rs = new SparkResultSet(rows, dataset.columns()); + assertTrue(rs.next()); + assertEquals("bb",rs.getString(1)); + assertEquals(10,rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(20,rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("cc",rs.getString(1)); + assertEquals(30,rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("aa",rs.getString(1)); + assertEquals(40,rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("c",rs.getString(1)); + assertEquals(60,rs.getInt(2)); + assertTrue(rs.next()); + assertEquals("b",rs.getString(1)); + assertEquals(80,rs.getInt(2)); + assertFalse(rs.next()); + } + } + + @Test + public void testOrderByWithExpression() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + + try { + String tableName = generateUniqueName(); + String ddl = "CREATE TABLE " + tableName + + " (a_string varchar not null, col1 integer, col2 integer, col3 timestamp, col4 varchar" + + " CONSTRAINT pk PRIMARY KEY (a_string))\n"; + createTestTable(getUrl(), ddl); + + Date date = new Date(System.currentTimeMillis()); + String dml = "UPSERT INTO " + tableName + " VALUES(?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + stmt.setString(1, "a"); + stmt.setInt(2, 40); + stmt.setInt(3, 20); + stmt.setDate(4, new Date(date.getTime())); + stmt.setString(5, "xxyy"); + stmt.execute(); + stmt.setString(1, "b"); + stmt.setInt(2, 50); + stmt.setInt(3, 30); + stmt.setDate(4, new Date(date.getTime()-500)); + stmt.setString(5, "yyzz"); + stmt.execute(); + stmt.setString(1, "c"); + stmt.setInt(2, 60); + stmt.setInt(3, 20); + stmt.setDate(4, new Date(date.getTime()-300)); + stmt.setString(5, "ddee"); + stmt.execute(); + conn.commit(); + + SQLContext sqlContext = SparkUtil.getSqlContext(); + Dataset phoenixDataSet = + new PhoenixRDD(SparkUtil.getSparkContext(), tableName, + JavaConverters + .collectionAsScalaIterableConverter( + Lists.newArrayList("col1", "col2", "col4")) + .asScala().toSeq(), + Option.apply((String) null), Option.apply(getUrl()), config, false, + null).toDataFrame(sqlContext); + + phoenixDataSet.registerTempTable(tableName); + Dataset<Row> dataset = + sqlContext.sql("SELECT col1+col2, col4, a_string FROM " + tableName + + " ORDER BY col1+col2, col4"); + List<Row> rows = dataset.collectAsList(); + ResultSet rs = new SparkResultSet(rows, dataset.columns()); + assertTrue(rs.next()); + assertEquals("a", rs.getString(3)); + assertTrue(rs.next()); + assertEquals("c", rs.getString(3)); + assertTrue(rs.next()); + assertEquals("b", rs.getString(3)); + assertFalse(rs.next()); + } catch (SQLException e) { + } finally { + conn.close(); + } + } + + @Test + public void testColumnFamily() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + String tableName = generateUniqueName(); + String ddl = "CREATE TABLE " + tableName + + " (a_string varchar not null, cf1.a integer, cf1.b varchar, col1 integer, cf2.c varchar, cf2.d integer, col2 integer" + + " CONSTRAINT pk PRIMARY KEY (a_string))\n"; + createTestTable(getUrl(), ddl); + String dml = "UPSERT INTO " + tableName + " VALUES(?,?,?,?,?,?,?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + stmt.setString(1, "a"); + stmt.setInt(2, 40); + stmt.setString(3, "aa"); + stmt.setInt(4, 10); + stmt.setString(5, "bb"); + stmt.setInt(6, 20); + stmt.setInt(7, 1); + stmt.execute(); + stmt.setString(1, "c"); + stmt.setInt(2, 30); + stmt.setString(3, "cc"); + stmt.setInt(4, 50); + stmt.setString(5, "dd"); + stmt.setInt(6, 60); + stmt.setInt(7, 3); + stmt.execute(); + stmt.setString(1, "b"); + stmt.setInt(2, 40); + stmt.setString(3, "bb"); + stmt.setInt(4, 5); + stmt.setString(5, "aa"); + stmt.setInt(6, 80); + stmt.setInt(7, 2); + stmt.execute(); + conn.commit(); + + + List<String> columns = + Lists.newArrayList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D", + "COL2"); + + SQLContext sqlContext = SparkUtil.getSqlContext(); + Dataset phoenixDataSet = + new PhoenixRDD(SparkUtil.getSparkContext(), tableName, + JavaConverters.collectionAsScalaIterableConverter(columns).asScala() + .toSeq(), + Option.apply((String) null), Option.apply(url), config, false, null) + .toDataFrame(sqlContext); + + phoenixDataSet.registerTempTable(tableName); + Dataset<Row> dataset = + sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from " + + tableName + " ORDER BY `CF1.A`,`CF2.C`"); + List<Row> rows = dataset.collectAsList(); + ResultSet rs = new SparkResultSet(rows, dataset.columns()); + + assertTrue(rs.next()); + assertEquals("c",rs.getString(1)); + assertEquals(30,rs.getInt(2)); + assertEquals("cc",rs.getString(3)); + assertEquals(50,rs.getInt(4)); + assertEquals("dd",rs.getString(5)); + assertEquals(60,rs.getInt(6)); + assertEquals(3,rs.getInt(7)); + assertTrue(rs.next()); + assertEquals("b",rs.getString(1)); + assertEquals(40,rs.getInt(2)); + assertEquals("bb",rs.getString(3)); + assertEquals(5,rs.getInt(4)); + assertEquals("aa",rs.getString(5)); + assertEquals(80,rs.getInt(6)); + assertEquals(2,rs.getInt(7)); + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(40,rs.getInt(2)); + assertEquals("aa",rs.getString(3)); + assertEquals(10,rs.getInt(4)); + assertEquals("bb",rs.getString(5)); + assertEquals(20,rs.getInt(6)); + assertEquals(1,rs.getInt(7)); + assertFalse(rs.next()); + + dataset = + sqlContext.sql("SELECT A_STRING, `CF1.A`, `CF1.B`, COL1, `CF2.C`, `CF2.D`, COL2 from " + + tableName + " ORDER BY COL2"); + rows = dataset.collectAsList(); + rs = new SparkResultSet(rows, dataset.columns()); + + assertTrue(rs.next()); + assertEquals("a",rs.getString(1)); + assertEquals(40,rs.getInt(2)); + assertEquals("aa",rs.getString(3)); + assertEquals(10,rs.getInt(4)); + assertEquals("bb",rs.getString(5)); + assertEquals(20,rs.getInt(6)); + assertEquals(1,rs.getInt(7)); + assertTrue(rs.next()); + assertEquals("b",rs.getString(1)); + assertEquals(40,rs.getInt(2)); + assertEquals("bb",rs.getString(3)); + assertEquals(5,rs.getInt(4)); + assertEquals("aa",rs.getString(5)); + assertEquals(80,rs.getInt(6)); + assertEquals(2,rs.getInt(7)); + assertTrue(rs.next()); + assertEquals("c",rs.getString(1)); + assertEquals(30,rs.getInt(2)); + assertEquals("cc",rs.getString(3)); + assertEquals(50,rs.getInt(4)); + assertEquals("dd",rs.getString(5)); + assertEquals(60,rs.getInt(6)); + assertEquals(3,rs.getInt(7)); + assertFalse(rs.next()); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java new file mode 100644 index 0000000..d72acbd --- /dev/null +++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SaltedTableIT.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.phoenix.end2end.salted.BaseSaltedTableIT; +import org.apache.phoenix.util.QueryBuilder; + +public class SaltedTableIT extends BaseSaltedTableIT { + + @Override + protected ResultSet executeQueryThrowsException(Connection conn, QueryBuilder queryBuilder, + String expectedPhoenixExceptionMsg, String expectedSparkExceptionMsg) { + ResultSet rs = null; + try { + rs = executeQuery(conn, queryBuilder); + fail(); + } + catch(Exception e) { + assertTrue(e.getMessage().contains(expectedSparkExceptionMsg)); + } + return rs; + } + + @Override + protected ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder) throws SQLException { + return SparkUtil.executeQuery(conn, queryBuilder, getUrl(), config); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java new file mode 100644 index 0000000..6285209 --- /dev/null +++ b/phoenix-spark/src/it/java/org/apache/phoenix/spark/SparkUtil.java @@ -0,0 +1,87 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.spark; + +import com.google.common.base.Joiner; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.QueryBuilder; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.SparkPlan; +import scala.Option; +import scala.collection.JavaConverters; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +public class SparkUtil { + + public static final String APP_NAME = "Java Spark Tests"; + public static final String NUM_EXECUTORS = "local[2]"; + public static final String UI_SHOW_CONSOLE_PROGRESS = "spark.ui.showConsoleProgress"; + + public static SparkContext getSparkContext() { + return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS) + .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sparkContext(); + } + + public static SQLContext getSqlContext() { + return SparkSession.builder().appName(APP_NAME).master(NUM_EXECUTORS) + .config(UI_SHOW_CONSOLE_PROGRESS, false).getOrCreate().sqlContext(); + } + + public static ResultSet executeQuery(Connection conn, QueryBuilder queryBuilder, String url, Configuration config) + throws SQLException { + SQLContext sqlContext = SparkUtil.getSqlContext(); + + boolean forceRowKeyOrder = + conn.unwrap(PhoenixConnection.class).getQueryServices().getProps() + .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, false); + // if we are forcing row key order we have to add an ORDER BY + // here we assume that the required columns are in the primary key column order + String prevOrderBy = queryBuilder.getOrderByClause(); + if (forceRowKeyOrder && (queryBuilder.getOrderByClause()==null || queryBuilder.getOrderByClause().isEmpty())) { + queryBuilder.setOrderByClause(Joiner.on(", ").join(queryBuilder.getRequiredColumns())); + } + + // create PhoenixRDD using the table name and columns that are required by the query + // since we don't set the predicate filtering is done after rows are returned from spark + Dataset phoenixDataSet = + new PhoenixRDD(SparkUtil.getSparkContext(), queryBuilder.getFullTableName(), + JavaConverters.collectionAsScalaIterableConverter(queryBuilder.getRequiredColumns()).asScala() + .toSeq(), + Option.apply((String) null), Option.apply(url), config, false, + null).toDataFrame(sqlContext); + + phoenixDataSet.registerTempTable(queryBuilder.getFullTableName()); + Dataset<Row> dataset = sqlContext.sql(queryBuilder.build()); + SparkPlan plan = dataset.queryExecution().executedPlan(); + List<Row> rows = dataset.collectAsList(); + queryBuilder.setOrderByClause(prevOrderBy); + ResultSet rs = new SparkResultSet(rows, dataset.columns()); + return rs; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/678563f5/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala ---------------------------------------------------------------------- diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala index 4e11acc..d1e38fa 100644 --- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala +++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala @@ -23,6 +23,7 @@ import org.joda.time.DateTime import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ListBuffer import org.apache.hadoop.conf.Configuration + /** * Note: If running directly from an IDE, these are the recommended VM parameters: * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m @@ -287,11 +288,11 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT { val plan = res.queryExecution.sparkPlan // filters should be pushed into phoenix relation - assert(plan.toString.contains("PushedFilters: [IsNotNull(COL1), IsNotNull(ID), " + - "EqualTo(COL1,test_row_1), EqualTo(ID,1)]")) + assert(plan.toString.contains("PushedFilters: [*IsNotNull(COL1), *IsNotNull(ID), " + + "*EqualTo(COL1,test_row_1), *EqualTo(ID,1)]")) // spark should run the filters on the rows returned by Phoenix - assert(!plan.toString.contains("Filter (((isnotnull(COL1#8) && isnotnull(ID#7L)) " + - "&& (COL1#8 = test_row_1)) && (ID#7L = 1))")) + assert(!plan.toString.matches(".*Filter (((isnotnull(COL1.*) && isnotnull(ID.*)) " + + " && (COL1.* = test_row_1)) && (ID.* = 1)).*")) } test("Can persist a dataframe using 'DataFrame.saveToPhoenix'") {