HIVE-18423: Support pushing computation from the optimizer for JDBC storage handler tables (Jonathan Doron, reviewed by Jesus Camacho Rodriguez)
Close apache/hive#288 Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/78348d88 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/78348d88 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/78348d88 Branch: refs/heads/branch-3 Commit: 78348d88b61883cb5eaab957f61219bc920b2452 Parents: c08f3b5 Author: Jonathan Doron <msydo...@gmail.com> Authored: Wed Apr 25 07:17:56 2018 -0700 Committer: Jesus Camacho Rodriguez <jcama...@apache.org> Committed: Fri Apr 27 09:10:47 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/Constants.java | 6 + .../hive/storage/jdbc/JdbcInputFormat.java | 2 +- .../hive/storage/jdbc/JdbcRecordReader.java | 6 +- .../org/apache/hive/storage/jdbc/JdbcSerDe.java | 37 ++-- .../hive/storage/jdbc/JdbcStorageHandler.java | 6 + .../hive/storage/jdbc/conf/DatabaseType.java | 3 +- .../storage/jdbc/conf/JdbcStorageConfig.java | 3 +- .../jdbc/conf/JdbcStorageConfigManager.java | 13 +- .../hive/storage/jdbc/dao/DatabaseAccessor.java | 2 +- .../jdbc/dao/DatabaseAccessorFactory.java | 3 + .../jdbc/dao/GenericJdbcDatabaseAccessor.java | 74 ++++++- .../jdbc/dao/JethroDatabaseAccessor.java | 50 +++++ .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 1 + .../metadata/HiveMaterializedViewsRegistry.java | 18 +- .../functions/HiveSqlCountAggFunction.java | 6 + .../reloperators/jdbc/HiveJdbcConverter.java | 107 ++++++++++ .../reloperators/jdbc/JdbcHiveTableScan.java | 58 ++++++ .../calcite/rules/HiveRelColumnsAlignment.java | 4 + .../rules/jdbc/JDBCAbstractSplitFilterRule.java | 208 +++++++++++++++++++ .../rules/jdbc/JDBCAggregationPushDownRule.java | 94 +++++++++ .../rules/jdbc/JDBCExtractJoinFilterRule.java | 67 ++++++ .../calcite/rules/jdbc/JDBCFilterJoinRule.java | 71 +++++++ .../rules/jdbc/JDBCFilterPushDownRule.java | 78 +++++++ .../rules/jdbc/JDBCJoinPushDownRule.java | 99 +++++++++ .../rules/jdbc/JDBCProjectPushDownRule.java | 81 ++++++++ .../rules/jdbc/JDBCRexCallValidator.java | 90 ++++++++ .../rules/jdbc/JDBCSortPushDownRule.java | 84 ++++++++ .../rules/jdbc/JDBCUnionPushDownRule.java | 88 ++++++++ .../calcite/rules/jdbc/package-info.java | 22 ++ .../calcite/translator/ASTBuilder.java | 33 ++- .../calcite/translator/ASTConverter.java | 18 +- .../hadoop/hive/ql/parse/CalcitePlanner.java | 170 ++++++++++----- .../test/queries/clientpositive/jdbc_handler.q | 40 ++++ .../clientpositive/llap/jdbc_handler.q.out | 119 +++++++++++ 34 files changed, 1678 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/common/src/java/org/apache/hadoop/hive/conf/Constants.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index ff9eb59..3d79eec 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -48,6 +48,7 @@ public class Constants { public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; + public static final String KAFKA_TOPIC = "kafka.topic"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; @@ -55,6 +56,11 @@ public class Constants { /* Kafka Ingestion state - valid values - START/STOP/RESET */ public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; + public static final String HIVE_JDBC_QUERY = "hive.sql.generated.query"; + public static final String JDBC_QUERY = "hive.sql.query"; + public static final String JDBC_HIVE_STORAGE_HANDLER_ID = + "org.apache.hive.storage.jdbc.JdbcStorageHandler"; + public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG = "hadoop.security.credential.provider.path"; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java index 6def148..caa823f 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcInputFormat.java @@ -66,7 +66,7 @@ public class JdbcInputFormat extends HiveInputFormat<LongWritable, MapWritable> dbAccessor = DatabaseAccessorFactory.getAccessor(job); } - int numRecords = dbAccessor.getTotalNumberOfRecords(job); + int numRecords = numSplits <=1 ? Integer.MAX_VALUE : dbAccessor.getTotalNumberOfRecords(job); if (numRecords < numSplits) { numSplits = numRecords; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java index 88b2f0a..1da6213 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcRecordReader.java @@ -43,7 +43,7 @@ public class JdbcRecordReader implements RecordReader<LongWritable, MapWritable> public JdbcRecordReader(JobConf conf, JdbcInputSplit split) { - LOGGER.debug("Initializing JdbcRecordReader"); + LOGGER.trace("Initializing JdbcRecordReader"); this.split = split; this.conf = conf; } @@ -52,14 +52,14 @@ public class JdbcRecordReader implements RecordReader<LongWritable, MapWritable> @Override public boolean next(LongWritable key, MapWritable value) throws IOException { try { - LOGGER.debug("JdbcRecordReader.next called"); + LOGGER.trace("JdbcRecordReader.next called"); if (dbAccessor == null) { dbAccessor = DatabaseAccessorFactory.getAccessor(conf); iterator = dbAccessor.getRecordIterator(conf, split.getLimit(), split.getOffset()); } if (iterator.hasNext()) { - LOGGER.debug("JdbcRecordReader has more records to read."); + LOGGER.trace("JdbcRecordReader has more records to read."); key.set(pos); pos++; Map<String, Object> record = iterator.next(); http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java index 3764c8c..eac03d2 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcSerDe.java @@ -15,6 +15,7 @@ package org.apache.hive.storage.jdbc; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -63,25 +64,35 @@ public class JdbcSerDe extends AbstractSerDe { @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { try { - LOGGER.debug("Initializing the SerDe"); + LOGGER.trace("Initializing the SerDe"); if (tbl.containsKey(JdbcStorageConfig.DATABASE_TYPE.getPropertyName())) { + final boolean hiveQueryExecution = tbl.containsKey(Constants.HIVE_JDBC_QUERY); + Configuration tableConfig = JdbcStorageConfigManager.convertPropertiesToConfiguration(tbl); DatabaseAccessor dbAccessor = DatabaseAccessorFactory.getAccessor(tableConfig); columnNames = dbAccessor.getColumnNames(tableConfig); numColumns = columnNames.size(); - - String[] hiveColumnNameArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMNS), ","); - if (numColumns != hiveColumnNameArray.length) { - throw new SerDeException("Expected " + numColumns + " columns. Table definition has " - + hiveColumnNameArray.length + " columns"); - } - List<String> hiveColumnNames = Arrays.asList(hiveColumnNameArray); - - hiveColumnTypeArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES), ":"); - if (hiveColumnTypeArray.length == 0) { - throw new SerDeException("Received an empty Hive column type definition"); + List<String> hiveColumnNames; + if (hiveQueryExecution) { + hiveColumnNames = columnNames; + final List<String> columnTypes = dbAccessor.getColumnTypes(tableConfig); + hiveColumnTypeArray = new String[columnTypes.size()]; + hiveColumnTypeArray = columnTypes.toArray(hiveColumnTypeArray); + } else { + + String[] hiveColumnNameArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMNS), ","); + if (numColumns != hiveColumnNameArray.length) { + throw new SerDeException("Expected " + numColumns + " columns. Table definition has " + + hiveColumnNameArray.length + " columns"); + } + hiveColumnNames = Arrays.asList(hiveColumnNameArray); + + hiveColumnTypeArray = parseProperty(tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES), ":"); + if (hiveColumnTypeArray.length == 0) { + throw new SerDeException("Received an empty Hive column type definition"); + } } List<ObjectInspector> fieldInspectors = new ArrayList<ObjectInspector>(numColumns); @@ -115,7 +126,7 @@ public class JdbcSerDe extends AbstractSerDe { @Override public Object deserialize(Writable blob) throws SerDeException { - LOGGER.debug("Deserializing from SerDe"); + LOGGER.trace("Deserializing from SerDe"); if (!(blob instanceof MapWritable)) { throw new SerDeException("Expected MapWritable. Got " + blob.getClass().getName()); } http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java index 4b03285..df55272 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/JdbcStorageHandler.java @@ -15,6 +15,7 @@ package org.apache.hive.storage.jdbc; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; @@ -124,4 +125,9 @@ public class JdbcStorageHandler implements HiveStorageHandler { } + @Override + public String toString() { + return Constants.JDBC_HIVE_STORAGE_HANDLER_ID; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java index c4e97ba..b8b770f 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/DatabaseType.java @@ -21,5 +21,6 @@ public enum DatabaseType { ORACLE, POSTGRES, MSSQL, - METASTORE + METASTORE, + JETHRO_DATA } http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java index ff6357d..1ccbe08 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfig.java @@ -18,7 +18,8 @@ public enum JdbcStorageConfig { DATABASE_TYPE("database.type", true), JDBC_URL("jdbc.url", true), JDBC_DRIVER_CLASS("jdbc.driver", true), - QUERY("query", true), + QUERY("query", false), + TABLE("table", false), JDBC_FETCH_SIZE("jdbc.fetch.size", false), COLUMN_MAPPING("column.mapping", false); http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java index 350b0c6..55fc0ea 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/conf/JdbcStorageConfigManager.java @@ -44,13 +44,12 @@ public class JdbcStorageConfigManager { public static final String CONFIG_USERNAME = CONFIG_PREFIX + ".dbcp.username"; private static final EnumSet<JdbcStorageConfig> DEFAULT_REQUIRED_PROPERTIES = EnumSet.of(JdbcStorageConfig.DATABASE_TYPE, - JdbcStorageConfig.JDBC_URL, - JdbcStorageConfig.JDBC_DRIVER_CLASS, - JdbcStorageConfig.QUERY); + JdbcStorageConfig.JDBC_URL, + JdbcStorageConfig.JDBC_DRIVER_CLASS); private static final EnumSet<JdbcStorageConfig> METASTORE_REQUIRED_PROPERTIES = EnumSet.of(JdbcStorageConfig.DATABASE_TYPE, - JdbcStorageConfig.QUERY); + JdbcStorageConfig.QUERY); private JdbcStorageConfigManager() { } @@ -120,6 +119,12 @@ public class JdbcStorageConfigManager { public static String getQueryToExecute(Configuration config) { String query = config.get(JdbcStorageConfig.QUERY.getPropertyName()); + + if (query == null) { + String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName()); + query = "select * from " + tableName; + } + String hiveFilterCondition = QueryConditionBuilder.getInstance().buildCondition(config); if ((hiveFilterCondition != null) && (!hiveFilterCondition.trim().isEmpty())) { query = query + " WHERE " + hiveFilterCondition; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java index f50d53e..fdaa794 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessor.java @@ -24,10 +24,10 @@ public interface DatabaseAccessor { List<String> getColumnNames(Configuration conf) throws HiveJdbcDatabaseAccessException; + List<String> getColumnTypes(Configuration conf) throws HiveJdbcDatabaseAccessException; int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException; - JdbcRecordIterator getRecordIterator(Configuration conf, int limit, int offset) throws HiveJdbcDatabaseAccessException; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java index 7dc690f..6d3c8d9 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/DatabaseAccessorFactory.java @@ -35,6 +35,9 @@ public class DatabaseAccessorFactory { case MYSQL: accessor = new MySqlDatabaseAccessor(); break; + case JETHRO_DATA: + accessor = new JethroDatabaseAccessor(); + break; default: accessor = new GenericJdbcDatabaseAccessor(); http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java index 178c97d..772bc5d 100644 --- a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/GenericJdbcDatabaseAccessor.java @@ -16,6 +16,7 @@ package org.apache.hive.storage.jdbc.dao; import org.apache.commons.dbcp.BasicDataSourceFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -34,6 +35,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -64,8 +66,7 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor { try { initializeDatabaseConnection(conf); - String sql = JdbcStorageConfigManager.getQueryToExecute(conf); - String metadataQuery = addLimitToQuery(sql, 1); + String metadataQuery = getMetaDataQuery(conf); LOGGER.debug("Query to execute is [{}]", metadataQuery); conn = dbcpDataSource.getConnection(); @@ -92,6 +93,75 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor { } + protected String getMetaDataQuery(Configuration conf) { + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + String metadataQuery = addLimitToQuery(sql, 1); + return metadataQuery; + } + + @Override + public List<String> getColumnTypes(Configuration conf) throws HiveJdbcDatabaseAccessException { + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + initializeDatabaseConnection(conf); + String metadataQuery = getMetaDataQuery(conf); + LOGGER.debug("Query to execute is [{}]", metadataQuery); + + conn = dbcpDataSource.getConnection(); + ps = conn.prepareStatement(metadataQuery); + rs = ps.executeQuery(); + + ResultSetMetaData metadata = rs.getMetaData(); + int numColumns = metadata.getColumnCount(); + List<String> columnTypes = new ArrayList<String>(numColumns); + for (int i = 0; i < numColumns; i++) { + switch (metadata.getColumnType(i + 1)) { + case Types.CHAR: + columnTypes.add(serdeConstants.STRING_TYPE_NAME); + break; + case Types.INTEGER: + columnTypes.add(serdeConstants.INT_TYPE_NAME); + break; + case Types.BIGINT: + columnTypes.add(serdeConstants.BIGINT_TYPE_NAME); + break; + case Types.DECIMAL: + columnTypes.add(serdeConstants.DECIMAL_TYPE_NAME); + break; + case Types.FLOAT: + case Types.REAL: + columnTypes.add(serdeConstants.FLOAT_TYPE_NAME); + break; + case Types.DOUBLE: + columnTypes.add(serdeConstants.DOUBLE_TYPE_NAME); + break; + case Types.DATE: + columnTypes.add(serdeConstants.DATE_TYPE_NAME); + break; + case Types.TIMESTAMP: + columnTypes.add(serdeConstants.TIMESTAMP_TYPE_NAME); + break; + + default: + columnTypes.add(metadata.getColumnTypeName(i+1)); + break; + } + } + + return columnTypes; + } catch (Exception e) { + LOGGER.error("Error while trying to get column names.", e); + throw new HiveJdbcDatabaseAccessException("Error while trying to get column names: " + e.getMessage(), e); + } finally { + cleanupResources(conn, ps, rs); + } + + } + + @Override public int getTotalNumberOfRecords(Configuration conf) throws HiveJdbcDatabaseAccessException { Connection conn = null; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java ---------------------------------------------------------------------- diff --git a/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java new file mode 100644 index 0000000..db0454e --- /dev/null +++ b/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc/dao/JethroDatabaseAccessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.storage.jdbc.dao; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hive.storage.jdbc.conf.JdbcStorageConfigManager; + +/** + * JethroData specific data accessor. This is needed because JethroData JDBC drivers do + * not support generic LIMIT and OFFSET escape functions, and has some special optimization + * for getting the query metadata using limit 0. + */ + +public class JethroDatabaseAccessor extends GenericJdbcDatabaseAccessor { + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + return sql + " LIMIT " + offset + "," + limit; + } + } + + @Override + protected String addLimitToQuery(String sql, int limit) { + return "Select * from (" + sql + ") as \"tmp\" limit " + limit; + } + + @Override + protected String getMetaDataQuery(Configuration conf) { + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + return addLimitToQuery(sql, 0); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index d6f5666..ed31348 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -5301,6 +5301,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { String sh = tbl.getStorageHandler().toString(); retval = !sh.equals("org.apache.hadoop.hive.hbase.HBaseStorageHandler") && !sh.equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID) + && !sh.equals(Constants.JDBC_HIVE_STORAGE_HANDLER_ID) && !sh.equals("org.apache.hadoop.hive.accumulo.AccumuloStorageHandler"); } return retval; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java index 6e585e5..960ad76 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java @@ -419,18 +419,26 @@ public final class HiveMaterializedViewsRegistry { } private static TableType obtainTableType(Table tabMetaData) { - if (tabMetaData.getStorageHandler() != null && - tabMetaData.getStorageHandler().toString().equals( - Constants.DRUID_HIVE_STORAGE_HANDLER_ID)) { - return TableType.DRUID; + if (tabMetaData.getStorageHandler() != null) { + final String storageHandlerStr = tabMetaData.getStorageHandler().toString(); + if (storageHandlerStr.equals(Constants.DRUID_HIVE_STORAGE_HANDLER_ID)) { + return TableType.DRUID; + } + + if (storageHandlerStr.equals(Constants.JDBC_HIVE_STORAGE_HANDLER_ID)) { + return TableType.JDBC; + } + } + return TableType.NATIVE; } //@TODO this seems to be the same as org.apache.hadoop.hive.ql.parse.CalcitePlanner.TableType.DRUID do we really need both private enum TableType { DRUID, - NATIVE + NATIVE, + JDBC } private enum OpType { http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java index c5c17de..615f30d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java @@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlSplittableAggFunction; +import org.apache.calcite.sql.SqlSyntax; import org.apache.calcite.sql.SqlSplittableAggFunction.CountSplitter; import org.apache.calcite.sql.SqlSplittableAggFunction.Registry; import org.apache.calcite.sql.fun.SqlStdOperatorTable; @@ -68,6 +69,11 @@ public class HiveSqlCountAggFunction extends SqlAggFunction implements CanAggreg } @Override + public SqlSyntax getSyntax() { + return SqlSyntax.FUNCTION_STAR; + } + + @Override public <T> T unwrap(Class<T> clazz) { if (clazz == SqlSplittableAggFunction.class) { return clazz.cast(new HiveCountSplitter()); http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/HiveJdbcConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/HiveJdbcConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/HiveJdbcConverter.java new file mode 100644 index 0000000..fc54644 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/HiveJdbcConverter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc; + +import java.util.List; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.adapter.jdbc.JdbcConvention; +import org.apache.calcite.adapter.jdbc.JdbcImplementor; +import org.apache.calcite.adapter.jdbc.JdbcRel; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelVisitor; +import org.apache.calcite.rel.convert.ConverterImpl; +import org.apache.calcite.sql.SqlDialect; + +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; + +/** + * This is a designated RelNode that splits the Hive operators and the Jdbc operators, + * every successor of this node will be Jdbc operator. + */ +public class HiveJdbcConverter extends ConverterImpl implements HiveRelNode { + + private final JdbcConvention convention; + + public HiveJdbcConverter(RelOptCluster cluster, RelTraitSet traits, + JdbcRel input, JdbcConvention jc) { + super(cluster, ConventionTraitDef.INSTANCE, traits, input); + convention = jc; + } + + private HiveJdbcConverter(RelOptCluster cluster, RelTraitSet traits, + RelNode input, JdbcConvention jc) { + super(cluster, ConventionTraitDef.INSTANCE, traits, input); + convention = jc; + } + + public JdbcConvention getJdbcConvention() { + return convention; + } + + public SqlDialect getJdbcDialect() { + return convention.dialect; + } + + @Override + public void implement(Implementor implementor) { + + } + + @Override + public RelNode copy( + RelTraitSet traitSet, + List<RelNode> inputs) { + return new HiveJdbcConverter(getCluster(), traitSet, sole(inputs), convention); + } + + public String generateSql() { + SqlDialect dialect = getJdbcDialect(); + final JdbcImplementor jdbcImplementor = + new JdbcImplementor(dialect, + (JavaTypeFactory) getCluster().getTypeFactory()); + final JdbcImplementor.Result result = + jdbcImplementor.visitChild(0, getInput()); + return result.asStatement().toSqlString(dialect).getSql(); + } + + public JdbcHiveTableScan getTableScan() { + final JdbcHiveTableScan[] tmpJdbcHiveTableScan = new JdbcHiveTableScan[1]; + new RelVisitor() { + + public void visit( + RelNode node, + int ordinal, + RelNode parent) { + if (node instanceof JdbcHiveTableScan && tmpJdbcHiveTableScan [0] == null) { + tmpJdbcHiveTableScan [0] = (JdbcHiveTableScan) node; + } else { + super.visit(node, ordinal, parent); + } + } + }.go(this); + + JdbcHiveTableScan jdbcHiveTableScan = tmpJdbcHiveTableScan [0]; + + assert jdbcHiveTableScan != null; + return jdbcHiveTableScan; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/JdbcHiveTableScan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/JdbcHiveTableScan.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/JdbcHiveTableScan.java new file mode 100644 index 0000000..5b9a635 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/jdbc/JdbcHiveTableScan.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc; + +import java.util.List; + +import org.apache.calcite.adapter.jdbc.JdbcConvention; +import org.apache.calcite.adapter.jdbc.JdbcTable; +import org.apache.calcite.adapter.jdbc.JdbcTableScan; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; + +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +/** + * Relational expression representing a scan of a HiveDB collection. + * + * <p> + * Additional operations might be applied, using the "find" or "aggregate" methods. + * </p> + */ +public class JdbcHiveTableScan extends JdbcTableScan { + + private final HiveTableScan hiveTableScan; + + public JdbcHiveTableScan(RelOptCluster cluster, RelOptTable table, JdbcTable jdbcTable, + JdbcConvention jdbcConvention, HiveTableScan hiveTableScan) { + super(cluster, table, jdbcTable, jdbcConvention); + this.hiveTableScan= hiveTableScan; + } + + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return new JdbcHiveTableScan( + getCluster(), table, jdbcTable, (JdbcConvention) getConvention(), this.hiveTableScan); + } + + public HiveTableScan getHiveTableScan() { + return hiveTableScan; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelColumnsAlignment.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelColumnsAlignment.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelColumnsAlignment.java index bccbde5..1d89ddd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelColumnsAlignment.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/HiveRelColumnsAlignment.java @@ -46,6 +46,7 @@ import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.ReflectUtil; import org.apache.calcite.util.ReflectiveVisitor; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; import com.google.common.collect.ImmutableList; @@ -89,6 +90,9 @@ public class HiveRelColumnsAlignment implements ReflectiveVisitor { } protected final RelNode dispatchAlign(RelNode node, List<RelFieldCollation> collations) { + if (node instanceof HiveJdbcConverter) { + return node; + } return alignDispatcher.invoke(node, collations); } http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAbstractSplitFilterRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAbstractSplitFilterRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAbstractSplitFilterRule.java new file mode 100644 index 0000000..c167458 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAbstractSplitFilterRule.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.ArrayList; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * JDBCAbstractSplitFilterRule split a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter} into + * two {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter} operators where the lower operator + * could be pushed down below the + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter}} + * operator and therefore could be sent to the external table. + */ + +public abstract class JDBCAbstractSplitFilterRule extends RelOptRule { + private static final Logger LOGGER = LoggerFactory.getLogger(JDBCAbstractSplitFilterRule.class); + + public static final JDBCAbstractSplitFilterRule SPLIT_FILTER_ABOVE_JOIN = new JDBCSplitFilterAboveJoinRule(); + public static final JDBCAbstractSplitFilterRule SPLIT_FILTER_ABOVE_CONVERTER = new JDBCSplitFilterRule(); + + /** + * FilterSupportedFunctionsVisitor traverse all of the Rex call and splits them into + * two lists, one with supported jdbc calls, and one with not supported jdbc calls. + */ + public static class FilterSupportedFunctionsVisitor extends RexVisitorImpl<Void> { + + private final SqlDialect dialect; + + public FilterSupportedFunctionsVisitor(SqlDialect dialect) { + super(true); + this.dialect = dialect; + } + + private final ArrayList<RexCall> validJdbcNode = new ArrayList<RexCall>(); + private final ArrayList<RexCall> invalidJdbcNode = new ArrayList<RexCall>(); + + public ArrayList<RexCall> getValidJdbcNode() { + return validJdbcNode; + } + + public ArrayList<RexCall> getInvalidJdbcNode() { + return invalidJdbcNode; + } + + @Override + public Void visitCall(RexCall call) { + if (call.getKind() == SqlKind.AND) { + return super.visitCall(call); + } else { + boolean isValidCall = JDBCRexCallValidator.isValidJdbcOperation(call, dialect); + if (isValidCall) { + validJdbcNode.add(call); + } else { + invalidJdbcNode.add(call); + } + } + return null; + } + + public boolean canBeSplit() { + return !validJdbcNode.isEmpty() && !invalidJdbcNode.isEmpty(); + } + } + + protected JDBCAbstractSplitFilterRule(RelOptRuleOperand operand) { + super(operand); + } + + public static boolean canSplitFilter(RexNode cond, SqlDialect dialect) { + FilterSupportedFunctionsVisitor visitor = new FilterSupportedFunctionsVisitor(dialect); + cond.accept(visitor); + return visitor.canBeSplit(); + } + + public boolean matches(RelOptRuleCall call, SqlDialect dialect) { + LOGGER.debug("MySplitFilter.matches has been called"); + + final HiveFilter filter = call.rel(0); + + RexNode cond = filter.getCondition(); + + return canSplitFilter(cond, dialect); + } + + public void onMatch(RelOptRuleCall call, SqlDialect dialect) { + LOGGER.debug("MySplitFilter.onMatch has been called"); + + final HiveFilter filter = call.rel(0); + + RexCall callExpression = (RexCall) filter.getCondition(); + + FilterSupportedFunctionsVisitor visitor = new FilterSupportedFunctionsVisitor(dialect); + callExpression.accept(visitor); + + ArrayList<RexCall> validJdbcNode = visitor.getValidJdbcNode(); + ArrayList<RexCall> invalidJdbcNode = visitor.getInvalidJdbcNode(); + + assert validJdbcNode.size() != 0 && invalidJdbcNode.size() != 0; + + final RexBuilder rexBuilder = filter.getCluster().getRexBuilder(); + + RexNode validCondition; + if (validJdbcNode.size() == 1) { + validCondition = validJdbcNode.get(0); + } else { + validCondition = rexBuilder.makeCall(SqlStdOperatorTable.AND, validJdbcNode); + } + + HiveFilter newJdbcValidFilter = new HiveFilter(filter.getCluster(), filter.getTraitSet(), filter.getInput(), + validCondition); + + RexNode invalidCondition; + if (invalidJdbcNode.size() == 1) { + invalidCondition = invalidJdbcNode.get(0); + } else { + invalidCondition = rexBuilder.makeCall(SqlStdOperatorTable.AND, invalidJdbcNode); + } + + HiveFilter newJdbcInvalidFilter = new HiveFilter(filter.getCluster(), filter.getTraitSet(), + newJdbcValidFilter, invalidCondition); + + call.transformTo(newJdbcInvalidFilter); + } + + /** + * JDBCSplitFilterAboveJoinRule split splitter above a HiveJoin operator, so we could push it into the HiveJoin. + */ + public static class JDBCSplitFilterAboveJoinRule extends JDBCAbstractSplitFilterRule { + public JDBCSplitFilterAboveJoinRule() { + super(operand(HiveFilter.class, + operand(HiveJoin.class, + operand(HiveJdbcConverter.class, any())))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + LOGGER.debug("MyUpperJoinFilterFilter.matches has been called"); + + final HiveJoin join = call.rel(1); + final HiveJdbcConverter conv = call.rel(2); + + RexNode joinCond = join.getCondition(); + + return super.matches(call) && JDBCRexCallValidator.isValidJdbcOperation(joinCond, conv.getJdbcDialect()); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final HiveJdbcConverter conv = call.rel(0); + super.onMatch(call, conv.getJdbcDialect()); + } + } + + /** + * JDBCSplitFilterRule splits a HiveFilter rule so we could push part of the HiveFilter into the jdbc. + */ + public static class JDBCSplitFilterRule extends JDBCAbstractSplitFilterRule { + public JDBCSplitFilterRule() { + super(operand(HiveFilter.class, + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveJdbcConverter conv = call.rel(1); + return super.matches(call, conv.getJdbcDialect()); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final HiveJdbcConverter conv = call.rel(1); + super.onMatch(call, conv.getJdbcDialect()); + } + } + +}; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAggregationPushDownRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAggregationPushDownRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAggregationPushDownRule.java new file mode 100644 index 0000000..8f96288 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCAggregationPushDownRule.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcAggregate; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcAggregateRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.hadoop.hive.ql.optimizer.calcite.functions.HiveSqlCountAggFunction; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCAggregationPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcAggregateRule.JdbcAggregate} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter} + * operator so it will be sent to the external table. + */ + +public class JDBCAggregationPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCAggregationPushDownRule.class); + + public static final JDBCAggregationPushDownRule INSTANCE = new JDBCAggregationPushDownRule(); + + public JDBCAggregationPushDownRule() { + super(operand(HiveAggregate.class, + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveAggregate agg = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + for (AggregateCall relOptRuleOperand : agg.getAggCallList()) { + SqlAggFunction f = relOptRuleOperand.getAggregation(); + if (f instanceof HiveSqlCountAggFunction) { + //count distinct with more that one argument is not supported + HiveSqlCountAggFunction countAgg = (HiveSqlCountAggFunction)f; + if (countAgg.isDistinct() && 1 < relOptRuleOperand.getArgList().size()) { + return false; + } + } + SqlKind kind = f.getKind(); + if (!converter.getJdbcDialect().supportsAggregateFunction(kind)) { + return false; + } + } + return true; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("MyAggregationPushDownRule.onMatch has been called"); + + final HiveAggregate agg = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + Aggregate newHiveAggregate = agg.copy(agg.getTraitSet(), converter.getInput(), + agg.getIndicatorCount() !=0, agg.getGroupSet(), agg.getGroupSets(), agg.getAggCallList()); + JdbcAggregate newJdbcAggregate = + (JdbcAggregate) new JdbcAggregateRule(converter.getJdbcConvention()).convert(newHiveAggregate); + if (newJdbcAggregate != null) { + RelNode converterRes = converter.copy(converter.getTraitSet(), Arrays.asList(newJdbcAggregate)); + + call.transformTo(converterRes); + } + } + +}; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCExtractJoinFilterRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCExtractJoinFilterRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCExtractJoinFilterRule.java new file mode 100644 index 0000000..32c486a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCExtractJoinFilterRule.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.rules.AbstractJoinExtractFilterRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; + +/** + * JDBCExtractJoinFilterRule extracts out the + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter} + * from a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin} operator. + * if the HiveFilter could be replaced by two HiveFilter operators that one of them could be pushed down below the + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter} + */ + + +public final class JDBCExtractJoinFilterRule extends AbstractJoinExtractFilterRule { + //~ Static fields/initializers --------------------------------------------- + public static final JDBCExtractJoinFilterRule INSTANCE = new JDBCExtractJoinFilterRule(); + + //~ Constructors ----------------------------------------------------------- + + /** + * Creates an JoinExtractFilterRule. + */ + public JDBCExtractJoinFilterRule() { + super(operand(HiveJoin.class, + operand(HiveJdbcConverter.class, any()), + operand(HiveJdbcConverter.class, any())), + HiveRelFactories.HIVE_BUILDER, null); + } + + //~ Methods ---------------------------------------------------------------- + + @Override + public boolean matches(RelOptRuleCall call) { + final Join join = call.rel(0); + final HiveJdbcConverter conv1 = call.rel(1); + final HiveJdbcConverter conv2 = call.rel(2); + if (!conv1.getJdbcDialect().equals(conv2.getJdbcDialect())) { + return false; + } + return JDBCAbstractSplitFilterRule.canSplitFilter(join.getCondition(), conv1.getJdbcDialect()); + } + +} + +// End JoinExtractFilterRule.java http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterJoinRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterJoinRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterJoinRule.java new file mode 100644 index 0000000..6fcb3a5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterJoinRule.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule; + +/** + * Rule that tries to push filter expressions into a join condition and into + * the inputs of the join. + */ + +public class JDBCFilterJoinRule extends HiveFilterJoinRule { + + public static final JDBCFilterJoinRule INSTANCE = new JDBCFilterJoinRule(); + + public JDBCFilterJoinRule() { + super(RelOptRule.operand(HiveFilter.class, + RelOptRule.operand(HiveJoin.class, + RelOptRule.operand(HiveJdbcConverter.class, RelOptRule.any()), + RelOptRule.operand(HiveJdbcConverter.class, RelOptRule.any()))), + "JDBCFilterJoinRule", true, HiveRelFactories.HIVE_BUILDER); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Filter filter = call.rel(0); + Join join = call.rel(1); + HiveJdbcConverter conv1 = call.rel(2); + HiveJdbcConverter conv2 = call.rel(3); + + if (!conv1.getJdbcDialect().equals(conv2.getJdbcDialect())) { + return false; + } + + boolean visitorRes = JDBCRexCallValidator.isValidJdbcOperation(filter.getCondition(), conv1.getJdbcDialect()); + if (visitorRes) { + return JDBCRexCallValidator.isValidJdbcOperation(join.getCondition(), conv1.getJdbcDialect()); + } + return false; + } + + @Override + public void onMatch(RelOptRuleCall call) { + Filter filter = call.rel(0); + Join join = call.rel(1); + super.perform(call, filter, join); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterPushDownRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterPushDownRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterPushDownRule.java new file mode 100644 index 0000000..acf136f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCFilterPushDownRule.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcFilter; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcFilterRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCExtractJoinFilterRule extracts out the + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter} + * from a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin} operator. + * if the HiveFilter could be replaced by two HiveFilter operators that one of them could be pushed down below the + * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter} + */ + +public class JDBCFilterPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCFilterPushDownRule.class); + + public static final JDBCFilterPushDownRule INSTANCE = new JDBCFilterPushDownRule(); + + public JDBCFilterPushDownRule() { + super(operand(HiveFilter.class, + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveFilter filter = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + RexNode cond = filter.getCondition(); + + return JDBCRexCallValidator.isValidJdbcOperation(cond, converter.getJdbcDialect()); + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCFilterPushDown has been called"); + + final HiveFilter filter = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + Filter newHiveFilter = filter.copy(filter.getTraitSet(), converter.getInput(), filter.getCondition()); + JdbcFilter newJdbcFilter = (JdbcFilter) new JdbcFilterRule(converter.getJdbcConvention()).convert(newHiveFilter); + if (newJdbcFilter != null) { + RelNode converterRes = converter.copy(converter.getTraitSet(), Arrays.asList(newJdbcFilter)); + + call.transformTo(converterRes); + } + } + +}; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCJoinPushDownRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCJoinPushDownRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCJoinPushDownRule.java new file mode 100644 index 0000000..459be6e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCJoinPushDownRule.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcJoin; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcJoinRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCJoinPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcJoin} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter}} + * operator so it will be sent to the external table. + */ + +public class JDBCJoinPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCJoinPushDownRule.class); + + public static final JDBCJoinPushDownRule INSTANCE = new JDBCJoinPushDownRule(); + + public JDBCJoinPushDownRule() { + super(operand(HiveJoin.class, + operand(HiveJdbcConverter.class, any()), + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveJoin join = call.rel(0); + final RexNode cond = join.getCondition(); + final HiveJdbcConverter converter1 = call.rel(1); + final HiveJdbcConverter converter2 = call.rel(2); + + //The actual check should be the compare of the connection string of the external tables + /*if (converter1.getJdbcConvention().equals(converter2.getJdbcConvention()) == false) { + return false; + }*/ + + if (!converter1.getJdbcConvention().getName().equals(converter2.getJdbcConvention().getName())) { + return false; + } + + if (cond.isAlwaysTrue()) { + //We don't want to push cross join + return false; + } + + boolean visitorRes = JDBCRexCallValidator.isValidJdbcOperation(cond, converter1.getJdbcDialect()); + return visitorRes; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCJoinPushDownRule has been called"); + + final HiveJoin join = call.rel(0); + final HiveJdbcConverter converter1 = call.rel(1); + final HiveJdbcConverter converter2 = call.rel(2); + + RelNode input1 = converter1.getInput(); + RelNode input2 = converter2.getInput(); + + HiveJoin newHiveJoin = join.copy(join.getTraitSet(), join.getCondition(), input1, input2, join.getJoinType(), + join.isSemiJoinDone()); + JdbcJoin newJdbcJoin = (JdbcJoin) new JdbcJoinRule(converter1.getJdbcConvention()).convert(newHiveJoin, + false); + if (newJdbcJoin != null) { + RelNode converterRes = converter1.copy(converter1.getTraitSet(), Arrays.asList(newJdbcJoin)); + if (converterRes != null) { + call.transformTo(converterRes); + } + } + } + +}; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCProjectPushDownRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCProjectPushDownRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCProjectPushDownRule.java new file mode 100644 index 0000000..5c03f87 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCProjectPushDownRule.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcProject; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcProjectRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCProjectPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcAggregateRule.JdbcProject} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter}} + * operator so it will be sent to the external table. + */ + +public class JDBCProjectPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCProjectPushDownRule.class); + + public static final JDBCProjectPushDownRule INSTANCE = new JDBCProjectPushDownRule(); + + public JDBCProjectPushDownRule() { + super(operand(HiveProject.class, + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveProject project = call.rel(0); + final HiveJdbcConverter conv = call.rel(1); + for (RexNode currProject : project.getProjects()) { + if (!JDBCRexCallValidator.isValidJdbcOperation(currProject, conv.getJdbcDialect())) { + return false; + } + } + + return true; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCProjectPushDownRule has been called"); + + final HiveProject project = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + + Project newHiveProject = project.copy(project.getTraitSet(), converter.getInput(), + project.getProjects(), project.getRowType()); + JdbcProject newJdbcProject = + (JdbcProject) new JdbcProjectRule(converter.getJdbcConvention()).convert(newHiveProject); + if (newJdbcProject != null) { + RelNode converterRes = converter.copy(converter.getTraitSet(), Arrays.asList(newJdbcProject)); + call.transformTo(converterRes); + } + } + +}; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCRexCallValidator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCRexCallValidator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCRexCallValidator.java new file mode 100644 index 0000000..7c72bd3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCRexCallValidator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A utility class that helps identify Hive-Jdbc functions gaps. + */ + +public final class JDBCRexCallValidator { + + private static final Logger LOG = LoggerFactory.getLogger(JDBCRexCallValidator.class); + + private static final class JdbcRexCallValidatorVisitor extends RexVisitorImpl<Void> { + private final SqlDialect dialect; + + private JdbcRexCallValidatorVisitor(SqlDialect dialect) { + super(true); + this.dialect = dialect; + } + + boolean res = true; + + private boolean validRexCall(RexCall call) { + if (call instanceof RexOver) { + LOG.debug("RexOver operator push down is not supported for now with the following operator:" + call); + return false; + } + final SqlOperator operator = call.getOperator(); + List <RexNode> operands = call.getOperands(); + RelDataType resType = call.getType(); + ArrayList<RelDataType> paramsListType = new ArrayList<RelDataType>(); + for (RexNode currNode : operands) { + paramsListType.add(currNode.getType()); + } + return dialect.supportsFunction(operator, resType, paramsListType); + } + + @Override + public Void visitCall(RexCall call) { + if (res) { + res = validRexCall(call); + if (res) { + return super.visitCall(call); + } + } + return null; + } + + private boolean go(RexNode cond) { + cond.accept(this); + return res; + } + } + + private JDBCRexCallValidator() { + } + + public static boolean isValidJdbcOperation(RexNode cond, SqlDialect dialect) { + return new JdbcRexCallValidatorVisitor(dialect).go(cond); + } + +}; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCSortPushDownRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCSortPushDownRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCSortPushDownRule.java new file mode 100644 index 0000000..33dc280 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCSortPushDownRule.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcSortRule; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcSort; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCSortPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcSort} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter}} + * operator so it will be sent to the external table. + */ + +public class JDBCSortPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCSortPushDownRule.class); + + public static final JDBCSortPushDownRule INSTANCE = new JDBCSortPushDownRule(); + + public JDBCSortPushDownRule() { + super(operand(HiveSortLimit.class, + operand(HiveJdbcConverter.class, operand(RelNode.class, any())))); + } + + public boolean matches(RelOptRuleCall call) { + final Sort sort = (Sort) call.rel(0); + final HiveJdbcConverter conv = call.rel(1); + + for (RexNode currCall : sort.getChildExps()) { + if (!JDBCRexCallValidator.isValidJdbcOperation(currCall, conv.getJdbcDialect())) { + return false; + } + } + + return true; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCSortPushDownRule has been called"); + + final HiveSortLimit sort = call.rel(0); + final HiveJdbcConverter converter = call.rel(1); + final RelNode input = call.rel(2); + + Sort newHiveSort = sort.copy(sort.getTraitSet(), input, sort.getCollation(), sort.getOffsetExpr(), + sort.getFetchExpr()); + + JdbcSort newJdbcSort = + (JdbcSort) new JdbcSortRule(converter.getJdbcConvention()).convert(newHiveSort, false); + if (newJdbcSort != null) { + RelNode converterRes = converter.copy(converter.getTraitSet(), Arrays.asList(newJdbcSort)); + + call.transformTo(converterRes); + } + } + +}; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCUnionPushDownRule.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCUnionPushDownRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCUnionPushDownRule.java new file mode 100644 index 0000000..d4f3b0e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/JDBCUnionPushDownRule.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; + +import java.util.Arrays; +import java.util.List; + +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcUnion; +import org.apache.calcite.adapter.jdbc.JdbcRules.JdbcUnionRule; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Union; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JDBCUnionPushDownRule convert a {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion} + * into a {@link org.apache.calcite.adapter.jdbc.JdbcRules.JdbcUnion} + * and pushes it down below the {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter}} + * operator so it will be sent to the external table. + */ + +public class JDBCUnionPushDownRule extends RelOptRule { + private static final Logger LOG = LoggerFactory.getLogger(JDBCUnionPushDownRule.class); + + public static final JDBCUnionPushDownRule INSTANCE = new JDBCUnionPushDownRule(); + + public JDBCUnionPushDownRule() { + super(operand(HiveUnion.class, + operand(HiveJdbcConverter.class, any()), + operand(HiveJdbcConverter.class, any()))); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final HiveUnion union = call.rel(0); + final HiveJdbcConverter converter1 = call.rel(1); + final HiveJdbcConverter converter2 = call.rel(2); + + //The actual check should be the compare of the connection string of the external tables + /*if (converter1.getJdbcConvention().equals(converter2.getJdbcConvention()) == false) { + return false; + }*/ + + if (!converter1.getJdbcConvention().getName().equals(converter2.getJdbcConvention().getName())) { + return false; + } + + return union.getInputs().size() == 2; + } + + @Override + public void onMatch(RelOptRuleCall call) { + LOG.debug("JDBCUnionPushDown has been called"); + + final HiveUnion union = call.rel(0); + final HiveJdbcConverter converter1 = call.rel(1); + final HiveJdbcConverter converter2 = call.rel(2); + + final List<RelNode> unionInput = Arrays.asList(converter1.getInput(), converter2.getInput()); + Union newHiveUnion = (Union) union.copy(union.getTraitSet(), unionInput, union.all); + JdbcUnion newJdbcUnion = (JdbcUnion) new JdbcUnionRule(converter1.getJdbcConvention()).convert(newHiveUnion); + if (newJdbcUnion != null) { + RelNode converterRes = converter1.copy(converter1.getTraitSet(), Arrays.asList(newJdbcUnion)); + + call.transformTo(converterRes); + } + } + +}; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/package-info.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/package-info.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/package-info.java new file mode 100644 index 0000000..08a17aa --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/jdbc/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * JDBC external table rules that pushes down operators as much as possible to the external jdbc table. + */ +package org.apache.hadoop.hive.ql.optimizer.calcite.rules.jdbc; http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java index 79fcfcf..0408d7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java @@ -33,13 +33,19 @@ import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.JdbcHiveTableScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.ParseDriver; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ASTBuilder { + public static final Logger LOGGER = LoggerFactory.getLogger(ASTBuilder.class); public static ASTBuilder construct(int tokenType, String text) { ASTBuilder b = new ASTBuilder(); @@ -59,14 +65,20 @@ public class ASTBuilder { "TOK_TMP_FILE")).node(); } - public static ASTNode table(RelNode scan) { - HiveTableScan hts; - if (scan instanceof DruidQuery) { - hts = (HiveTableScan) ((DruidQuery)scan).getTableScan(); + public static ASTNode table(final RelNode scan) { + HiveTableScan hts = null; + if (scan instanceof HiveJdbcConverter) { + HiveJdbcConverter jdbcConverter = (HiveJdbcConverter) scan; + JdbcHiveTableScan jdbcHiveTableScan = jdbcConverter.getTableScan(); + + hts = jdbcHiveTableScan.getHiveTableScan(); + } else if (scan instanceof DruidQuery) { + hts = (HiveTableScan) ((DruidQuery) scan).getTableScan(); } else { hts = (HiveTableScan) scan; } + assert hts != null; RelOptHiveTable hTbl = (RelOptHiveTable) hts.getTable(); ASTBuilder b = ASTBuilder.construct(HiveParser.TOK_TABREF, "TOK_TABREF").add( ASTBuilder.construct(HiveParser.TOK_TABNAME, "TOK_TABNAME") @@ -100,13 +112,26 @@ public class ASTBuilder { propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY") .add(HiveParser.StringLiteral, "\"" + Constants.DRUID_QUERY_TYPE + "\"") .add(HiveParser.StringLiteral, "\"" + dq.getQueryType().getQueryName() + "\"")); + } else if (scan instanceof HiveJdbcConverter) { + HiveJdbcConverter jdbcConverter = (HiveJdbcConverter) scan; + final String query = jdbcConverter.generateSql(); + LOGGER.info("The HiveJdbcConverter generated sql message is: " + System.lineSeparator() + query); + propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY") + .add(HiveParser.StringLiteral, "\"" + Constants.JDBC_QUERY + "\"") + .add(HiveParser.StringLiteral, "\"" + SemanticAnalyzer.escapeSQLString(query) + "\"")); + + propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY") + .add(HiveParser.StringLiteral, "\"" + Constants.HIVE_JDBC_QUERY + "\"") + .add(HiveParser.StringLiteral, "\"" + SemanticAnalyzer.escapeSQLString(query) + "\"")); } + if (hts.isInsideView()) { // We need to carry the insideView information from calcite into the ast. propList.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTY, "TOK_TABLEPROPERTY") .add(HiveParser.StringLiteral, "\"insideView\"") .add(HiveParser.StringLiteral, "\"TRUE\"")); } + b.add(ASTBuilder.construct(HiveParser.TOK_TABLEPROPERTIES, "TOK_TABLEPROPERTIES").add(propList)); // NOTE: Calcite considers tbls to be equal if their names are the same. Hence http://git-wip-us.apache.org/repos/asf/hive/blob/78348d88/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java index 47c00aa..141ebe5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTConverter.java @@ -64,9 +64,11 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.HiveJdbcConverter; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.jdbc.JdbcHiveTableScan; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter.HiveToken; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; @@ -344,6 +346,10 @@ public class ASTConverter { TableScan f = (TableScan) r; s = new Schema(f); ast = ASTBuilder.table(f); + } else if (r instanceof HiveJdbcConverter) { + HiveJdbcConverter f = (HiveJdbcConverter) r; + s = new Schema(f); + ast = ASTBuilder.table(f); } else if (r instanceof DruidQuery) { DruidQuery f = (DruidQuery) r; s = new Schema(f); @@ -425,7 +431,8 @@ public class ASTConverter { public void visit(RelNode node, int ordinal, RelNode parent) { if (node instanceof TableScan || - node instanceof DruidQuery) { + node instanceof DruidQuery || + node instanceof HiveJdbcConverter) { ASTConverter.this.from = node; } else if (node instanceof Filter) { handle((Filter) node); @@ -765,6 +772,15 @@ public class ASTConverter { } } + Schema(HiveJdbcConverter scan) { + HiveJdbcConverter jdbcHiveCoverter = scan; + final JdbcHiveTableScan jdbcTableScan = jdbcHiveCoverter.getTableScan(); + String tabName = jdbcTableScan.getHiveTableScan().getTableAlias(); + for (RelDataTypeField field : jdbcHiveCoverter.getRowType().getFieldList()) { + add(new ColumnInfo(tabName, field.getName())); + } + } + Schema(Project select, String alias) { for (RelDataTypeField field : select.getRowType().getFieldList()) { add(new ColumnInfo(alias, field.getName()));