PHOENIX-2584 - Support Array datatype in phoenix-pig module
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8574d431 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8574d431 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8574d431 Branch: refs/heads/calcite Commit: 8574d4311dcb5fc7a76465b32f24daf045ffb562 Parents: 4625096 Author: ravimagham <ravi.mag...@bazaarvoice.com> Authored: Thu Jan 21 18:31:28 2016 -0800 Committer: ravimagham <ravi.mag...@bazaarvoice.com> Committed: Thu Jan 21 18:31:28 2016 -0800 ---------------------------------------------------------------------- .../mapreduce/PhoenixRecordWritable.java | 28 ++-- .../phoenix/pig/PhoenixHBaseLoaderIT.java | 167 ++++++++++++++++++- .../phoenix/pig/PhoenixHBaseStorerIT.java | 37 ++++ .../apache/phoenix/pig/PhoenixHBaseLoader.java | 22 +-- .../apache/phoenix/pig/PhoenixHBaseStorage.java | 33 +++- .../org/apache/phoenix/pig/util/TypeUtil.java | 49 ++++-- .../pig/writable/PhoenixPigDBWritable.java | 121 -------------- .../apache/phoenix/pig/util/TypeUtilTest.java | 39 +++-- 8 files changed, 319 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java index 8d7d97a..0d3e724 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java @@ -17,11 +17,6 @@ */ package org.apache.phoenix.mapreduce; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; -import org.apache.phoenix.schema.types.*; -import org.apache.phoenix.util.ColumnInfo; -import org.joda.time.DateTime; - import java.sql.Array; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -29,16 +24,24 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.schema.types.PBinary; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.util.ColumnInfo; +import org.joda.time.DateTime; + public class PhoenixRecordWritable implements DBWritable { private final List<Object> upsertValues = new ArrayList<>(); - private final Map<String, Object> resultMap = new HashMap<>(); - private List<ColumnInfo> columnMetaDataList; + private final Map<String, Object> resultMap = new LinkedHashMap<>(); + private List<ColumnInfo> columnMetaDataList; /** For serialization; do not use. */ public PhoenixRecordWritable() { @@ -147,9 +150,10 @@ public class PhoenixRecordWritable implements DBWritable { // PVarbinary and PBinary are provided as byte[] but are treated as SQL objects if (PDataType.equalsAny(finalType, PVarbinary.INSTANCE, PBinary.INSTANCE)) { statement.setObject(i + 1, finalObj); + } else { + // otherwise set as array type + setArrayInStatement(statement, finalType, primativeArrayToObjectArray((byte[]) finalObj), i + 1); } - // otherwise set as array type - setArrayInStatement(statement, finalType, primativeArrayToObjectArray((byte[]) finalObj), i + 1); } else if (finalObj instanceof short[]) { setArrayInStatement(statement, finalType, primativeArrayToObjectArray((short[]) finalObj), i + 1); } else if (finalObj instanceof int[]) { @@ -171,10 +175,6 @@ public class PhoenixRecordWritable implements DBWritable { for (int i = 1; i <= metaData.getColumnCount(); i++) { // return the contents of a PhoenixArray, if necessary Object value = resultSet.getObject(i); - if (value instanceof PhoenixArray) { - value = ((PhoenixArray) value).getArray(); - } - // put a (ColumnLabel -> value) entry into the result map resultMap.put(metaData.getColumnLabel(i), value); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java index 606282a..5de323e 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java @@ -22,6 +22,7 @@ package org.apache.phoenix.pig; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.sql.Array; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; @@ -39,6 +40,8 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.junit.Test; +import com.google.common.collect.Lists; + /** * * Test class to run all the integration tests against a virtual map reduce cluster. @@ -672,4 +675,166 @@ public class PhoenixHBaseLoaderIT extends BasePigIT { List<Tuple> actualList = data.get("out"); assertEquals(expectedList.size(), actualList.size()); } -} + + /** + * + * @throws Exception + */ + @Test + public void testLoadForArrayWithQuery() throws Exception { + //create the table + final String TABLE = "TABLE14"; + String ddl = "CREATE TABLE " + TABLE + + " ( ID INTEGER PRIMARY KEY, a_double_array double array[] , a_varchar_array varchar array, a_concat_str varchar, sep varchar)"; + + conn.createStatement().execute(ddl); + + Double[] doubleArr = new Double[3]; + doubleArr[0] = 2.2; + doubleArr[1] = 4.4; + doubleArr[2] = 6.6; + Array doubleArray = conn.createArrayOf("DOUBLE", doubleArr); + Tuple doubleArrTuple = Storage.tuple(2.2d, 4.4d, 6.6d); + + Double[] doubleArr2 = new Double[2]; + doubleArr2[0] = 12.2; + doubleArr2[1] = 22.2; + Array doubleArray2 = conn.createArrayOf("DOUBLE", doubleArr2); + Tuple doubleArrTuple2 = Storage.tuple(12.2d, 22.2d); + + String[] strArr = new String[4]; + strArr[0] = "ABC"; + strArr[1] = "DEF"; + strArr[2] = "GHI"; + strArr[3] = "JKL"; + Array strArray = conn.createArrayOf("VARCHAR", strArr); + Tuple strArrTuple = Storage.tuple("ABC", "DEF", "GHI", "JKL"); + + String[] strArr2 = new String[2]; + strArr2[0] = "ABC"; + strArr2[1] = "XYZ"; + Array strArray2 = conn.createArrayOf("VARCHAR", strArr2); + Tuple strArrTuple2 = Storage.tuple("ABC", "XYZ"); + + //upsert data. + final String dml = "UPSERT INTO " + TABLE + " VALUES(?, ?, ?, ?, ?) "; + PreparedStatement stmt = conn.prepareStatement(dml); + stmt.setInt(1, 1); + stmt.setArray(2, doubleArray); + stmt.setArray(3, strArray); + stmt.setString(4, "ONE,TWO,THREE"); + stmt.setString(5, ","); + stmt.execute(); + + stmt.setInt(1, 2); + stmt.setArray(2, doubleArray2); + stmt.setArray(3, strArray2); + stmt.setString(4, "FOUR:five:six"); + stmt.setString(5, ":"); + stmt.execute(); + + conn.commit(); + + Tuple dynArrTuple = Storage.tuple("ONE", "TWO", "THREE"); + Tuple dynArrTuple2 = Storage.tuple("FOUR", "five", "six"); + + //sql query + final String sqlQuery = String.format(" SELECT ID, A_DOUBLE_ARRAY, A_VARCHAR_ARRAY, REGEXP_SPLIT(a_concat_str, sep) AS flattend_str FROM %s ", TABLE); + + final Data data = Storage.resetData(pigServer); + List<Tuple> expectedList = new ArrayList<Tuple>(); + expectedList.add(Storage.tuple(1, 3L, 4L, dynArrTuple)); + expectedList.add(Storage.tuple(2, 2L, 2L, dynArrTuple2)); + final String load = String.format("A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');",sqlQuery,zkQuorum); + pigServer.setBatchOn(); + pigServer.registerQuery(load); + pigServer.registerQuery("B = FOREACH A GENERATE ID, SIZE(A_DOUBLE_ARRAY), SIZE(A_VARCHAR_ARRAY), FLATTEND_STR;"); + pigServer.registerQuery("STORE B INTO 'out' using mock.Storage();"); + pigServer.executeBatch(); + + List<Tuple> actualList = data.get("out"); + assertEquals(expectedList.size(), actualList.size()); + assertEquals(expectedList, actualList); + + Schema schema = pigServer.dumpSchema("A"); + List<FieldSchema> fields = schema.getFields(); + assertEquals(4, fields.size()); + assertTrue(fields.get(0).alias.equalsIgnoreCase("ID")); + assertTrue(fields.get(0).type == DataType.INTEGER); + assertTrue(fields.get(1).alias.equalsIgnoreCase("A_DOUBLE_ARRAY")); + assertTrue(fields.get(1).type == DataType.TUPLE); + assertTrue(fields.get(2).alias.equalsIgnoreCase("A_VARCHAR_ARRAY")); + assertTrue(fields.get(2).type == DataType.TUPLE); + assertTrue(fields.get(3).alias.equalsIgnoreCase("FLATTEND_STR")); + assertTrue(fields.get(3).type == DataType.TUPLE); + + Iterator<Tuple> iterator = pigServer.openIterator("A"); + Tuple firstTuple = Storage.tuple(1, doubleArrTuple, strArrTuple, dynArrTuple); + Tuple secondTuple = Storage.tuple(2, doubleArrTuple2, strArrTuple2, dynArrTuple2); + List<Tuple> expectedRows = Lists.newArrayList(firstTuple, secondTuple); + List<Tuple> actualRows = Lists.newArrayList(); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + actualRows.add(tuple); + } + assertEquals(expectedRows, actualRows); + } + + + /** + * + * @throws Exception + */ + @Test + public void testLoadForArrayWithTable() throws Exception { + //create the table + final String TABLE = "TABLE15"; + String ddl = "CREATE TABLE " + TABLE + + " ( ID INTEGER PRIMARY KEY, a_double_array double array[])"; + + conn.createStatement().execute(ddl); + + Double[] doubleArr = new Double[3]; + doubleArr[0] = 2.2; + doubleArr[1] = 4.4; + doubleArr[2] = 6.6; + Array doubleArray = conn.createArrayOf("DOUBLE", doubleArr); + Tuple doubleArrTuple = Storage.tuple(2.2d, 4.4d, 6.6d); + + Double[] doubleArr2 = new Double[2]; + doubleArr2[0] = 12.2; + doubleArr2[1] = 22.2; + Array doubleArray2 = conn.createArrayOf("DOUBLE", doubleArr2); + Tuple doubleArrTuple2 = Storage.tuple(12.2d, 22.2d); + + //upsert data. + final String dml = "UPSERT INTO " + TABLE + " VALUES(?, ?) "; + PreparedStatement stmt = conn.prepareStatement(dml); + stmt.setInt(1, 1); + stmt.setArray(2, doubleArray); + stmt.execute(); + + stmt.setInt(1, 2); + stmt.setArray(2, doubleArray2); + stmt.execute(); + + conn.commit(); + + final Data data = Storage.resetData(pigServer); + List<Tuple> expectedList = new ArrayList<Tuple>(); + expectedList.add(Storage.tuple(1, doubleArrTuple)); + expectedList.add(Storage.tuple(2, doubleArrTuple2)); + + pigServer.setBatchOn(); + pigServer.registerQuery(String.format( + "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE, + zkQuorum)); + + pigServer.registerQuery("STORE A INTO 'out' using mock.Storage();"); + pigServer.executeBatch(); + + List<Tuple> actualList = data.get("out"); + assertEquals(expectedList.size(), actualList.size()); + assertEquals(expectedList, actualList); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java index 9e9434c..b9339bc 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java @@ -19,10 +19,12 @@ */ package org.apache.phoenix.pig; +import static org.apache.pig.builtin.mock.Storage.tuple; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.sql.Array; import java.sql.ResultSet; import java.sql.Statement; import java.util.Collection; @@ -254,4 +256,39 @@ public class PhoenixHBaseStorerIT extends BasePigIT { assertEquals(now, rs.getTimestamp(4).getTime()); } + + @Test + public void testStoreForArray() throws Exception { + + final String tableName = "TABLE5"; + final Statement stmt = conn.createStatement(); + String ddl = "CREATE TABLE " + tableName + + " ( ID INTEGER PRIMARY KEY, dbl double array[], a_varchar_array varchar array)"; + + stmt.execute(ddl); + + final Data data = Storage.resetData(pigServer); + data.set("in", tuple(1, tuple(2.2)), + tuple(2, tuple(2.4, 2.5)), + tuple(3, tuple(2.3))); + + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD 'in' USING mock.Storage() as (id:int, dbl:tuple());"); + pigServer.registerQuery("Store A into 'hbase://" + tableName + "/ID,DBL" + + "' using " + PhoenixHBaseStorage.class.getName() + "('" + + zkQuorum + "', '-batchSize 1000');"); + + if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) { + throw new RuntimeException("Job failed", pigServer.executeBatch() + .get(0).getException()); + } + + final ResultSet rs = stmt + .executeQuery(String.format("SELECT id , dbl FROM %s where id = 2" , tableName)); + + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + Array expectedDoubleArr = conn.createArrayOf("DOUBLE", new Double[] { 2.4, 2.5 }); + assertEquals(expectedDoubleArr,rs.getArray(2)); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java index adbb556..2be2317 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java @@ -29,20 +29,19 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.phoenix.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.mapreduce.PhoenixRecordWritable; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil; import org.apache.phoenix.pig.util.QuerySchemaParserFunction; import org.apache.phoenix.pig.util.TableSchemaParserFunction; import org.apache.phoenix.pig.util.TypeUtil; -import org.apache.phoenix.pig.writable.PhoenixPigDBWritable; import org.apache.pig.Expression; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; @@ -57,6 +56,7 @@ import org.apache.pig.impl.util.UDFContext; import com.google.common.base.Preconditions; + /** * LoadFunc to load data from HBase using Phoenix . * @@ -90,8 +90,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { private String tableName; private String selectQuery; private String zkQuorum ; - private PhoenixInputFormat<PhoenixPigDBWritable> inputFormat; - private RecordReader<NullWritable,PhoenixPigDBWritable> reader; + private PhoenixInputFormat<PhoenixRecordWritable> inputFormat; + private RecordReader<NullWritable,PhoenixRecordWritable> reader; private String contextSignature; private ResourceSchema schema; @@ -122,13 +122,13 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { * @param configuration * @throws PigException */ - private void initializePhoenixPigConfiguration(final String location, final Configuration configuration) throws PigException { + private void initializePhoenixPigConfiguration(final String location, final Configuration configuration) throws IOException { if(this.config != null) { return; } this.config = configuration; this.config.set(HConstants.ZOOKEEPER_QUORUM,this.zkQuorum); - PhoenixConfigurationUtil.setInputClass(this.config, PhoenixPigDBWritable.class); + PhoenixConfigurationUtil.setInputClass(this.config, PhoenixRecordWritable.class); Pair<String,String> pair = null; try { if (location.startsWith(PHOENIX_TABLE_NAME_SCHEME)) { @@ -167,8 +167,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { @Override public InputFormat getInputFormat() throws IOException { if(inputFormat == null) { - inputFormat = new PhoenixInputFormat<PhoenixPigDBWritable>(); - PhoenixConfigurationUtil.setInputClass(this.config,PhoenixPigDBWritable.class); + inputFormat = new PhoenixInputFormat<PhoenixRecordWritable>(); + PhoenixConfigurationUtil.setInputClass(this.config, PhoenixRecordWritable.class); } return inputFormat; } @@ -198,18 +198,18 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { if(!reader.nextKeyValue()) { return null; } - final PhoenixPigDBWritable record = reader.getCurrentValue(); + final PhoenixRecordWritable record = reader.getCurrentValue(); if(record == null) { return null; } - final Tuple tuple = TypeUtil.transformToTuple(record,schema.getFields()); + final Tuple tuple = TypeUtil.transformToTuple(record, schema.getFields()); return tuple; } catch (InterruptedException e) { Thread.currentThread().interrupt(); int errCode = 6018; final String errMsg = "Error while reading input"; throw new ExecException(errMsg, errCode,PigException.REMOTE_ENVIRONMENT, e); - } + } } private void printUsage(final String location) throws PigException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java index 28060f3..a9f0c8f 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java @@ -37,16 +37,21 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.phoenix.mapreduce.PhoenixOutputFormat; +import org.apache.phoenix.mapreduce.PhoenixRecordWritable; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.pig.util.TableSchemaParserFunction; -import org.apache.phoenix.pig.writable.PhoenixPigDBWritable; +import org.apache.phoenix.pig.util.TypeUtil; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.ColumnInfo; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.StoreFuncInterface; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * StoreFunc that uses Phoenix to store data into HBase. @@ -81,8 +86,10 @@ import org.apache.pig.impl.util.UDFContext; @SuppressWarnings("rawtypes") public class PhoenixHBaseStorage implements StoreFuncInterface { + private static final Logger LOG = LoggerFactory.getLogger(PhoenixHBaseStorage.class); + private Configuration config; - private RecordWriter<NullWritable, PhoenixPigDBWritable> writer; + private RecordWriter<NullWritable, PhoenixRecordWritable> writer; private List<ColumnInfo> columnInfo = null; private String contextSignature = null; private ResourceSchema schema; @@ -167,16 +174,28 @@ public class PhoenixHBaseStorage implements StoreFuncInterface { @Override public void putNext(Tuple t) throws IOException { - ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields(); - PhoenixPigDBWritable record = PhoenixPigDBWritable.newInstance(fieldSchemas,this.columnInfo); - for(int i=0; i<t.size(); i++) { - record.add(t.get(i)); - } + ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields(); + PhoenixRecordWritable record = new PhoenixRecordWritable(this.columnInfo); try { + for(int i=0; i<t.size(); i++) { + Object value = t.get(i); + if(value == null) { + record.add(null); + continue; + } + ColumnInfo cinfo = this.columnInfo.get(i); + byte type = (fieldSchemas == null) ? DataType.findType(value) : fieldSchemas[i].getType(); + PDataType pDataType = PDataType.fromTypeId(cinfo.getSqlType()); + Object v = TypeUtil.castPigTypeToPhoenix(value, type, pDataType); + record.add(v); + } this.writer.write(null, record); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); + } catch (SQLException e) { + LOG.error("Error on tuple {} .",t); + throw new IOException(e); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java index 53d3ae3..8c9bd6a 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java @@ -20,14 +20,17 @@ package org.apache.phoenix.pig.util; import java.io.IOException; import java.sql.Date; +import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.phoenix.pig.writable.PhoenixPigDBWritable; +import org.apache.phoenix.mapreduce.PhoenixRecordWritable; +import org.apache.phoenix.schema.types.PArrayDataType; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PChar; @@ -53,6 +56,7 @@ import org.apache.phoenix.schema.types.PUnsignedTimestamp; import org.apache.phoenix.schema.types.PUnsignedTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.schema.types.PhoenixArray; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.backend.hadoop.hbase.HBaseBinaryConverter; @@ -72,7 +76,7 @@ public final class TypeUtil { private static final HBaseBinaryConverter BINARY_CONVERTER = new HBaseBinaryConverter(); private static final ImmutableMap<PDataType, Byte> PHOENIX_TO_PIG_TYPE = init(); private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance(); - + private TypeUtil() {} /** @@ -157,15 +161,24 @@ public final class TypeUtil { } /** - * This method encodes a value with Phoenix data type. It begins with checking whether an object is BINARY and makes + * This method encodes a value with Phoenix data type. It begins with checking whether an object is TUPLE. A {@link Tuple} is mapped + * to a {@link PArrayDataType} . It then checks if it is BINARY and makes * a call to {@link #castBytes(Object, PDataType)} to convert bytes to targetPhoenixType. It returns a {@link RuntimeException} * when object can not be coerced. * * @param o * @param targetPhoenixType * @return Object + * @throws SQLException */ - public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) { + public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) throws SQLException { + + if(DataType.TUPLE == objectType) { + Tuple tuple = (Tuple)o; + List<Object> data = tuple.getAll(); + return data.toArray(); + } + PDataType inferredPType = getType(o, objectType); if (inferredPType == null) { return null; } @@ -237,20 +250,22 @@ public final class TypeUtil { * @return * @throws IOException */ - public static Tuple transformToTuple(final PhoenixPigDBWritable record, final ResourceFieldSchema[] projectedColumns) + public static Tuple transformToTuple(final PhoenixRecordWritable record, final ResourceFieldSchema[] projectedColumns) throws IOException { - List<Object> columnValues = record.getValues(); + Map<String, Object> columnValues = record.getResultMap(); + if (columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) { return null; } int numColumns = columnValues.size(); Tuple tuple = TUPLE_FACTORY.newTuple(numColumns); try { - for (int i = 0; i < numColumns; i++) { + int i = 0; + for (Map.Entry<String,Object> entry : columnValues.entrySet()) { final ResourceFieldSchema fieldSchema = projectedColumns[i]; - Object object = columnValues.get(i); + Object object = entry.getValue(); if (object == null) { - tuple.set(i, null); + tuple.set(i++, null); continue; } @@ -289,9 +304,20 @@ public final class TypeUtil { case DataType.BIGINTEGER: tuple.set(i, DataType.toBigInteger(object)); break; + case DataType.TUPLE: + { + PhoenixArray array = (PhoenixArray)object; + Tuple t = TUPLE_FACTORY.newTuple(array.getDimensions());; + for(int j = 0 ; j < array.getDimensions() ; j++) { + t.set(j,array.getElement(j)); + } + tuple.set(i, t); + break; + } default: throw new RuntimeException(String.format(" Not supported [%s] pig type", fieldSchema)); } + i++; } } catch (Exception ex) { final String errorMsg = String.format(" Error transforming PhoenixRecord to Tuple [%s] ", ex.getMessage()); @@ -300,7 +326,7 @@ public final class TypeUtil { } return tuple; } - + /** * Returns the mapping pig data type for a given phoenix data type. * @@ -309,6 +335,9 @@ public final class TypeUtil { */ public static Byte getPigDataTypeForPhoenixType(final PDataType phoenixDataType) { Preconditions.checkNotNull(phoenixDataType); + if(phoenixDataType instanceof PArrayDataType) { + return DataType.TUPLE; + } final Byte pigDataType = PHOENIX_TO_PIG_TYPE.get(phoenixDataType); if (LOG.isDebugEnabled()) { LOG.debug(String.format(" For PhoenixDataType [%s] , pigDataType is [%s] ", http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java deleted file mode 100644 index 566e427..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.pig.writable; - -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.lib.db.DBWritable; -import org.apache.phoenix.pig.util.TypeUtil; -import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.util.ColumnInfo; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.pig.data.DataType; - -import com.google.common.base.Preconditions; - -/** - * A {@link Writable} representing a Phoenix record. This class - * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement} - * b) reads the column values from the {@link ResultSet} - * - */ -public class PhoenixPigDBWritable implements DBWritable { - - private final List<Object> values; - private ResourceFieldSchema[] fieldSchemas; - private List<ColumnInfo> columnMetadataList; - - public PhoenixPigDBWritable() { - this.values = new ArrayList<Object>(); - } - - @Override - public void write(PreparedStatement statement) throws SQLException { - for (int i = 0; i < columnMetadataList.size(); i++) { - Object o = values.get(i); - ColumnInfo columnInfo = columnMetadataList.get(i); - byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType(); - try { - Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType()); - if (upsertValue != null) { - statement.setObject(i + 1, upsertValue, columnInfo.getSqlType()); - } else { - statement.setNull(i + 1, columnInfo.getSqlType()); - } - } catch (RuntimeException re) { - throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s" - ,columnInfo.toString(),re.getMessage()),re); - - } - } - } - - public void add(Object value) { - values.add(value); - } - - private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) { - PDataType pDataType = PDataType.fromTypeId(sqlType); - return TypeUtil.castPigTypeToPhoenix(o, type, pDataType); - } - - public List<Object> getValues() { - return values; - } - - @Override - public void readFields(final ResultSet rs) throws SQLException { - Preconditions.checkNotNull(rs); - final int noOfColumns = rs.getMetaData().getColumnCount(); - values.clear(); - for(int i = 1 ; i <= noOfColumns ; i++) { - Object obj = rs.getObject(i); - values.add(obj); - } - } - - public ResourceFieldSchema[] getFieldSchemas() { - return fieldSchemas; - } - - public void setFieldSchemas(ResourceFieldSchema[] fieldSchemas) { - this.fieldSchemas = fieldSchemas; - } - - public List<ColumnInfo> getColumnMetadataList() { - return columnMetadataList; - } - - public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) { - this.columnMetadataList = columnMetadataList; - } - - public static PhoenixPigDBWritable newInstance(final ResourceFieldSchema[] fieldSchemas, - final List<ColumnInfo> columnMetadataList) { - final PhoenixPigDBWritable dbWritable = new PhoenixPigDBWritable (); - dbWritable.setFieldSchemas(fieldSchemas); - dbWritable.setColumnMetadataList(columnMetadataList); - return dbWritable; - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java index 0b44d2b..e459dc1 100644 --- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java +++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java @@ -24,44 +24,57 @@ import static org.mockito.Mockito.when; import java.math.BigDecimal; import java.math.BigInteger; -import java.util.List; +import java.util.Map; -import org.apache.phoenix.pig.writable.PhoenixPigDBWritable; +import org.apache.phoenix.mapreduce.PhoenixRecordWritable; +import org.apache.phoenix.schema.types.PArrayDataType; +import org.apache.phoenix.schema.types.PDouble; +import org.apache.phoenix.schema.types.PhoenixArray; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.junit.Test; -import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class TypeUtilTest { @Test public void testTransformToTuple() throws Exception { - PhoenixPigDBWritable record = mock(PhoenixPigDBWritable.class); - List<Object> values = Lists.newArrayList(); - values.add("213123"); - values.add(1231123); - values.add(31231231232131L); - values.add("bytearray".getBytes()); - when(record.getValues()).thenReturn(values); + PhoenixRecordWritable record = mock(PhoenixRecordWritable.class); + Double[] doubleArr = new Double[2]; + doubleArr[0] = 64.87; + doubleArr[1] = 89.96; + PhoenixArray arr = PArrayDataType.instantiatePhoenixArray(PDouble.INSTANCE, doubleArr); + Map<String,Object> values = Maps.newLinkedHashMap(); + values.put("first", "213123"); + values.put("second", 1231123); + values.put("third", 31231231232131L); + values.put("four", "bytearray".getBytes()); + values.put("five", arr); + when(record.getResultMap()).thenReturn(values); ResourceFieldSchema field = new ResourceFieldSchema().setType(DataType.CHARARRAY); ResourceFieldSchema field1 = new ResourceFieldSchema().setType(DataType.INTEGER); ResourceFieldSchema field2 = new ResourceFieldSchema().setType(DataType.LONG); ResourceFieldSchema field3 = new ResourceFieldSchema().setType(DataType.BYTEARRAY); - ResourceFieldSchema[] projectedColumns = { field, field1, field2, field3 }; + ResourceFieldSchema field4 = new ResourceFieldSchema().setType(DataType.TUPLE); + ResourceFieldSchema[] projectedColumns = { field, field1, field2, field3 , field4 }; Tuple t = TypeUtil.transformToTuple(record, projectedColumns); assertEquals(DataType.LONG, DataType.findType(t.get(2))); + assertEquals(DataType.TUPLE, DataType.findType(t.get(4))); + Tuple doubleArrayTuple = (Tuple)t.get(4); + assertEquals(2,doubleArrayTuple.size()); field = new ResourceFieldSchema().setType(DataType.BIGDECIMAL); field1 = new ResourceFieldSchema().setType(DataType.BIGINTEGER); values.clear(); - values.add(new BigDecimal(123123123.123213)); - values.add(new BigInteger("1312313231312")); + values.put("first", new BigDecimal(123123123.123213)); + values.put("second", new BigInteger("1312313231312")); ResourceFieldSchema[] columns = { field, field1 }; + t = TypeUtil.transformToTuple(record, columns); assertEquals(DataType.BIGDECIMAL, DataType.findType(t.get(0)));