PHOENIX-3503 PhoenixStorageHandler doesn't work properly when execution engine of Hive is Tez.
Signed-off-by: Sergey Soldatov <s...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2ebd5160 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2ebd5160 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2ebd5160 Branch: refs/heads/4.x-HBase-1.1 Commit: 2ebd5160c6213617c4e18cb12a5c93fbc8ef95f6 Parents: 816840b Author: Jeongdae Kim <kjd9...@gmail.com> Authored: Mon Nov 21 10:24:22 2016 +0900 Committer: Sergey Soldatov <s...@apache.org> Committed: Sun Feb 19 22:51:18 2017 -0800 ---------------------------------------------------------------------- .../hive/mapreduce/PhoenixInputFormat.java | 26 ++++++----- .../phoenix/hive/query/PhoenixQueryBuilder.java | 33 +++++++------- .../hive/util/PhoenixStorageHandlerUtil.java | 46 ++++++++++---------- 3 files changed, 56 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ebd5160/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 3a94655..9ebc3d6 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -19,6 +19,13 @@ package org.apache.phoenix.hive.mapreduce; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import java.io.IOException; +import java.sql.Connection; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -37,9 +44,14 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -57,14 +69,6 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.util.PhoenixRuntime; -import java.io.IOException; -import java.sql.Connection; -import java.sql.Statement; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Properties; - /** * Custom InputFormat to feed into Hive */ @@ -112,8 +116,8 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, tableName, PhoenixStorageHandlerUtil.getReadColumnNames(jobConf), conditionList); } else if (PhoenixStorageHandlerConstants.TEZ.equals(executionEngine)) { - Map<String, String> columnTypeMap = PhoenixStorageHandlerUtil.createColumnTypeMap - (jobConf); + Map<String, TypeInfo> columnTypeMap = + PhoenixStorageHandlerUtil.createColumnTypeMap(jobConf); if (LOG.isDebugEnabled()) { LOG.debug("Column type map for TEZ : " + columnTypeMap); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ebd5160/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java index a38814d..d1e74d9 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java @@ -23,22 +23,26 @@ import com.google.common.base.Predicate; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.mapred.JobConf; import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; import org.apache.phoenix.hive.ql.index.IndexSearchCondition; import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; import org.apache.phoenix.hive.util.PhoenixUtil; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - /** * Query builder. Produces a query depending on the colummn list and conditions */ @@ -81,10 +85,9 @@ public class PhoenixQueryBuilder { private String makeQueryString(JobConf jobConf, String tableName, List<String> readColumnList, String whereClause, String queryTemplate, String hints, Map<String, - String> columnTypeMap) throws IOException { + TypeInfo> columnTypeMap) throws IOException { StringBuilder sql = new StringBuilder(); - List<String> conditionColumnList = buildWhereClause(jobConf, sql, whereClause, - columnTypeMap); + List<String> conditionColumnList = buildWhereClause(jobConf, sql, whereClause,columnTypeMap); if (conditionColumnList.size() > 0) { addConditionColumnToReadColumn(readColumnList, conditionColumnList); @@ -155,7 +158,7 @@ public class PhoenixQueryBuilder { } public String buildQuery(JobConf jobConf, String tableName, List<String> readColumnList, - String whereClause, Map<String, String> columnTypeMap) throws + String whereClause, Map<String, TypeInfo> columnTypeMap) throws IOException { String hints = getHint(jobConf, tableName); @@ -199,7 +202,7 @@ public class PhoenixQueryBuilder { } private List<String> buildWhereClause(JobConf jobConf, StringBuilder sql, String whereClause, - Map<String, String> columnTypeMap) throws IOException { + Map<String, TypeInfo> columnTypeMap) throws IOException { if (whereClause == null || whereClause.isEmpty()) { return Collections.emptyList(); } @@ -214,11 +217,11 @@ public class PhoenixQueryBuilder { if (whereClause.contains(columnName)) { conditionColumnList.add(columnName); - if (PhoenixStorageHandlerConstants.DATE_TYPE.equals(columnTypeMap.get(columnName) - )) { + if (PhoenixStorageHandlerConstants.DATE_TYPE.equals( + columnTypeMap.get(columnName).getTypeName())) { whereClause = applyDateFunctionUsingRegex(whereClause, columnName); - } else if (PhoenixStorageHandlerConstants.TIMESTAMP_TYPE.equals(columnTypeMap.get - (columnName))) { + } else if (PhoenixStorageHandlerConstants.TIMESTAMP_TYPE.equals( + columnTypeMap.get(columnName).getTypeName())) { whereClause = applyTimestampFunctionUsingRegex(whereClause, columnName); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ebd5160/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixStorageHandlerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixStorageHandlerUtil.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixStorageHandlerUtil.java index 18799a5..1dc8545 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixStorageHandlerUtil.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixStorageHandlerUtil.java @@ -19,24 +19,6 @@ package org.apache.phoenix.hive.util; import com.google.common.base.Joiner; import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; -import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.net.DNS; -import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; -import org.apache.phoenix.hive.ql.index.IndexSearchCondition; -import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; - -import javax.naming.NamingException; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; @@ -54,6 +36,24 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import javax.naming.NamingException; +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.net.DNS; +import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; +import org.apache.phoenix.hive.ql.index.IndexSearchCondition; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; /** * Misc utils for PhoenixStorageHandler @@ -194,16 +194,16 @@ public class PhoenixStorageHandlerUtil { return new StringBuilder("[").append(sessionId).append("]-").append(tableName).toString(); } - public static Map<String, String> createColumnTypeMap(JobConf jobConf) { - Map<String, String> columnTypeMap = Maps.newHashMap(); + public static Map<String, TypeInfo> createColumnTypeMap(JobConf jobConf) { + Map<String, TypeInfo> columnTypeMap = Maps.newHashMap(); String[] columnNames = jobConf.get(serdeConstants.LIST_COLUMNS).split (PhoenixStorageHandlerConstants.COMMA); - String[] columnTypes = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split - (PhoenixStorageHandlerConstants.COMMA); + List<TypeInfo> typeInfos = + TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES)); for (int i = 0, limit = columnNames.length; i < limit; i++) { - columnTypeMap.put(columnNames[i], columnTypes[i]); + columnTypeMap.put(columnNames[i], typeInfos.get(i)); } return columnTypeMap;