PHOENIX-2584 - Support Array datatype in phoenix-pig module

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8574d431
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8574d431
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8574d431

Branch: refs/heads/calcite
Commit: 8574d4311dcb5fc7a76465b32f24daf045ffb562
Parents: 4625096
Author: ravimagham <ravi.mag...@bazaarvoice.com>
Authored: Thu Jan 21 18:31:28 2016 -0800
Committer: ravimagham <ravi.mag...@bazaarvoice.com>
Committed: Thu Jan 21 18:31:28 2016 -0800

----------------------------------------------------------------------
 .../mapreduce/PhoenixRecordWritable.java        |  28 ++--
 .../phoenix/pig/PhoenixHBaseLoaderIT.java       | 167 ++++++++++++++++++-
 .../phoenix/pig/PhoenixHBaseStorerIT.java       |  37 ++++
 .../apache/phoenix/pig/PhoenixHBaseLoader.java  |  22 +--
 .../apache/phoenix/pig/PhoenixHBaseStorage.java |  33 +++-
 .../org/apache/phoenix/pig/util/TypeUtil.java   |  49 ++++--
 .../pig/writable/PhoenixPigDBWritable.java      | 121 --------------
 .../apache/phoenix/pig/util/TypeUtilTest.java   |  39 +++--
 8 files changed, 319 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java
index 8d7d97a..0d3e724 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWritable.java
@@ -17,11 +17,6 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.phoenix.schema.types.*;
-import org.apache.phoenix.util.ColumnInfo;
-import org.joda.time.DateTime;
-
 import java.sql.Array;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -29,16 +24,24 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.ColumnInfo;
+import org.joda.time.DateTime;
+
 
 public class PhoenixRecordWritable implements DBWritable {
 
     private final List<Object> upsertValues = new ArrayList<>();
-    private final Map<String, Object> resultMap = new HashMap<>();
-    private List<ColumnInfo> columnMetaDataList;
+    private final Map<String, Object> resultMap = new LinkedHashMap<>();
+    private List<ColumnInfo> columnMetaDataList; 
 
     /** For serialization; do not use. */
     public PhoenixRecordWritable() {
@@ -147,9 +150,10 @@ public class PhoenixRecordWritable implements DBWritable {
                 // PVarbinary and PBinary are provided as byte[] but are 
treated as SQL objects
                 if (PDataType.equalsAny(finalType, PVarbinary.INSTANCE, 
PBinary.INSTANCE)) {
                     statement.setObject(i + 1, finalObj);
+                } else {
+                    // otherwise set as array type
+                    setArrayInStatement(statement, finalType, 
primativeArrayToObjectArray((byte[]) finalObj), i + 1);
                 }
-                // otherwise set as array type
-                setArrayInStatement(statement, finalType, 
primativeArrayToObjectArray((byte[]) finalObj), i + 1);
             } else if (finalObj instanceof short[]) {
                 setArrayInStatement(statement, finalType, 
primativeArrayToObjectArray((short[]) finalObj), i + 1);
             } else if (finalObj instanceof int[]) {
@@ -171,10 +175,6 @@ public class PhoenixRecordWritable implements DBWritable {
         for (int i = 1; i <= metaData.getColumnCount(); i++) {
             // return the contents of a PhoenixArray, if necessary
             Object value = resultSet.getObject(i);
-            if (value instanceof PhoenixArray) {
-                value = ((PhoenixArray) value).getArray();
-            }
-
             // put a (ColumnLabel -> value) entry into the result map
             resultMap.put(metaData.getColumnLabel(i), value);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
index 606282a..5de323e 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
@@ -22,6 +22,7 @@ package org.apache.phoenix.pig;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.sql.Array;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.ArrayList;
@@ -39,6 +40,8 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 /**
  * 
  * Test class to run all the integration tests against a virtual map reduce 
cluster.
@@ -672,4 +675,166 @@ public class PhoenixHBaseLoaderIT extends BasePigIT {
         List<Tuple> actualList = data.get("out");
         assertEquals(expectedList.size(), actualList.size());
     }
-}
+    
+   /**
+    * 
+    * @throws Exception
+    */
+    @Test
+    public void testLoadForArrayWithQuery() throws Exception {
+         //create the table
+        final String TABLE = "TABLE14";
+        String ddl = "CREATE TABLE  " + TABLE
+                + " ( ID INTEGER PRIMARY KEY, a_double_array double array[] , 
a_varchar_array varchar array, a_concat_str varchar, sep varchar)";
+                
+        conn.createStatement().execute(ddl);
+        
+        Double[] doubleArr =  new Double[3];
+        doubleArr[0] = 2.2;
+        doubleArr[1] = 4.4;
+        doubleArr[2] = 6.6;
+        Array doubleArray = conn.createArrayOf("DOUBLE", doubleArr);
+        Tuple doubleArrTuple = Storage.tuple(2.2d, 4.4d, 6.6d);
+        
+        Double[] doubleArr2 =  new Double[2];
+        doubleArr2[0] = 12.2;
+        doubleArr2[1] = 22.2;
+        Array doubleArray2 = conn.createArrayOf("DOUBLE", doubleArr2);
+        Tuple doubleArrTuple2 = Storage.tuple(12.2d, 22.2d);
+        
+        String[] strArr =  new String[4];
+        strArr[0] = "ABC";
+        strArr[1] = "DEF";
+        strArr[2] = "GHI";
+        strArr[3] = "JKL";
+        Array strArray  = conn.createArrayOf("VARCHAR", strArr);
+        Tuple strArrTuple = Storage.tuple("ABC", "DEF", "GHI", "JKL");
+        
+        String[] strArr2 =  new String[2];
+        strArr2[0] = "ABC";
+        strArr2[1] = "XYZ";
+        Array strArray2  = conn.createArrayOf("VARCHAR", strArr2);
+        Tuple strArrTuple2 = Storage.tuple("ABC", "XYZ");
+        
+        //upsert data.
+        final String dml = "UPSERT INTO " + TABLE + " VALUES(?, ?, ?, ?, ?) ";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        stmt.setInt(1, 1);
+        stmt.setArray(2, doubleArray);
+        stmt.setArray(3, strArray);
+        stmt.setString(4, "ONE,TWO,THREE");
+        stmt.setString(5, ",");
+        stmt.execute();
+        
+        stmt.setInt(1, 2);
+        stmt.setArray(2, doubleArray2);
+        stmt.setArray(3, strArray2);
+        stmt.setString(4, "FOUR:five:six");
+        stmt.setString(5, ":");
+        stmt.execute();
+       
+        conn.commit();
+        
+        Tuple dynArrTuple = Storage.tuple("ONE", "TWO", "THREE");
+        Tuple dynArrTuple2 = Storage.tuple("FOUR", "five", "six");
+        
+        //sql query
+        final String sqlQuery = String.format(" SELECT ID, A_DOUBLE_ARRAY, 
A_VARCHAR_ARRAY, REGEXP_SPLIT(a_concat_str, sep) AS flattend_str FROM %s ", 
TABLE); 
+      
+        final Data data = Storage.resetData(pigServer);
+        List<Tuple> expectedList = new ArrayList<Tuple>();
+        expectedList.add(Storage.tuple(1, 3L, 4L, dynArrTuple));
+        expectedList.add(Storage.tuple(2, 2L, 2L, dynArrTuple2));
+        final String load = String.format("A = load 'hbase://query/%s' using " 
+ PhoenixHBaseLoader.class.getName() + "('%s');",sqlQuery,zkQuorum);
+        pigServer.setBatchOn();
+        pigServer.registerQuery(load);
+        pigServer.registerQuery("B = FOREACH A GENERATE ID, 
SIZE(A_DOUBLE_ARRAY), SIZE(A_VARCHAR_ARRAY), FLATTEND_STR;");
+        pigServer.registerQuery("STORE B INTO 'out' using mock.Storage();");
+        pigServer.executeBatch();
+        
+        List<Tuple> actualList = data.get("out");
+        assertEquals(expectedList.size(), actualList.size());
+        assertEquals(expectedList, actualList);
+        
+        Schema schema = pigServer.dumpSchema("A");
+        List<FieldSchema> fields = schema.getFields();
+        assertEquals(4, fields.size());
+        assertTrue(fields.get(0).alias.equalsIgnoreCase("ID"));
+        assertTrue(fields.get(0).type == DataType.INTEGER);
+        assertTrue(fields.get(1).alias.equalsIgnoreCase("A_DOUBLE_ARRAY"));
+        assertTrue(fields.get(1).type == DataType.TUPLE);
+        assertTrue(fields.get(2).alias.equalsIgnoreCase("A_VARCHAR_ARRAY"));
+        assertTrue(fields.get(2).type == DataType.TUPLE);
+        assertTrue(fields.get(3).alias.equalsIgnoreCase("FLATTEND_STR"));
+        assertTrue(fields.get(3).type == DataType.TUPLE);
+        
+        Iterator<Tuple> iterator = pigServer.openIterator("A");
+        Tuple firstTuple = Storage.tuple(1, doubleArrTuple, strArrTuple, 
dynArrTuple);
+        Tuple secondTuple = Storage.tuple(2, doubleArrTuple2, strArrTuple2, 
dynArrTuple2);
+        List<Tuple> expectedRows = Lists.newArrayList(firstTuple, secondTuple);
+        List<Tuple> actualRows = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            actualRows.add(tuple);
+        }
+        assertEquals(expectedRows, actualRows);
+    }
+    
+    
+    /**
+     * 
+     * @throws Exception
+     */
+     @Test
+     public void testLoadForArrayWithTable() throws Exception {
+          //create the table
+         final String TABLE = "TABLE15";
+         String ddl = "CREATE TABLE  " + TABLE
+                 + " ( ID INTEGER PRIMARY KEY, a_double_array double array[])";
+                 
+         conn.createStatement().execute(ddl);
+         
+         Double[] doubleArr =  new Double[3];
+         doubleArr[0] = 2.2;
+         doubleArr[1] = 4.4;
+         doubleArr[2] = 6.6;
+         Array doubleArray = conn.createArrayOf("DOUBLE", doubleArr);
+         Tuple doubleArrTuple = Storage.tuple(2.2d, 4.4d, 6.6d);
+         
+         Double[] doubleArr2 =  new Double[2];
+         doubleArr2[0] = 12.2;
+         doubleArr2[1] = 22.2;
+         Array doubleArray2 = conn.createArrayOf("DOUBLE", doubleArr2);
+         Tuple doubleArrTuple2 = Storage.tuple(12.2d, 22.2d);
+         
+         //upsert data.
+         final String dml = "UPSERT INTO " + TABLE + " VALUES(?, ?) ";
+         PreparedStatement stmt = conn.prepareStatement(dml);
+         stmt.setInt(1, 1);
+         stmt.setArray(2, doubleArray);
+         stmt.execute();
+         
+         stmt.setInt(1, 2);
+         stmt.setArray(2, doubleArray2);
+         stmt.execute();
+        
+         conn.commit();
+         
+         final Data data = Storage.resetData(pigServer);
+         List<Tuple> expectedList = new ArrayList<Tuple>();
+         expectedList.add(Storage.tuple(1, doubleArrTuple));
+         expectedList.add(Storage.tuple(2, doubleArrTuple2));
+         
+         pigServer.setBatchOn();
+         pigServer.registerQuery(String.format(
+             "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE,
+             zkQuorum));
+
+         pigServer.registerQuery("STORE A INTO 'out' using mock.Storage();");
+         pigServer.executeBatch();
+         
+         List<Tuple> actualList = data.get("out");
+         assertEquals(expectedList.size(), actualList.size());
+         assertEquals(expectedList, actualList);
+     }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
index 9e9434c..b9339bc 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
@@ -19,10 +19,12 @@
  */
 package org.apache.phoenix.pig;
 
+import static org.apache.pig.builtin.mock.Storage.tuple;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.sql.Array;
 import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Collection;
@@ -254,4 +256,39 @@ public class PhoenixHBaseStorerIT extends BasePigIT {
         assertEquals(now, rs.getTimestamp(4).getTime());
      
     }
+    
+    @Test
+    public void testStoreForArray() throws Exception {
+     
+        final String tableName = "TABLE5";
+        final Statement stmt = conn.createStatement();
+        String ddl = "CREATE TABLE  " + tableName
+                + " ( ID INTEGER PRIMARY KEY, dbl double array[], 
a_varchar_array varchar array)";
+      
+        stmt.execute(ddl);
+      
+        final Data data = Storage.resetData(pigServer);
+        data.set("in",  tuple(1, tuple(2.2)),
+                        tuple(2, tuple(2.4, 2.5)),
+                        tuple(3, tuple(2.3)));
+
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage() as 
(id:int, dbl:tuple());");
+        pigServer.registerQuery("Store A into 'hbase://" + tableName + 
"/ID,DBL"
+                               + "' using " + 
PhoenixHBaseStorage.class.getName() + "('"
+                                + zkQuorum + "', '-batchSize 1000');");
+
+        if (pigServer.executeBatch().get(0).getStatus() != 
JOB_STATUS.COMPLETED) {
+             throw new RuntimeException("Job failed", pigServer.executeBatch()
+                    .get(0).getException());
+        }
+
+        final ResultSet rs = stmt
+                .executeQuery(String.format("SELECT id , dbl FROM %s where id 
= 2" , tableName));
+
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        Array expectedDoubleArr = conn.createArrayOf("DOUBLE", new Double[] { 
2.4, 2.5 });
+        assertEquals(expectedDoubleArr,rs.getArray(2));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
index adbb556..2be2317 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
@@ -29,20 +29,19 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
 import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
 import org.apache.phoenix.pig.util.TableSchemaParserFunction;
 import org.apache.phoenix.pig.util.TypeUtil;
-import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
 import org.apache.pig.Expression;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
@@ -57,6 +56,7 @@ import org.apache.pig.impl.util.UDFContext;
 
 import com.google.common.base.Preconditions;
 
+
 /**
  * LoadFunc to load data from HBase using Phoenix .
  * 
@@ -90,8 +90,8 @@ public final class PhoenixHBaseLoader extends LoadFunc 
implements LoadMetadata {
     private String tableName;
     private String selectQuery;
     private String zkQuorum ;
-    private PhoenixInputFormat<PhoenixPigDBWritable> inputFormat;
-    private RecordReader<NullWritable,PhoenixPigDBWritable> reader;
+    private PhoenixInputFormat<PhoenixRecordWritable> inputFormat;
+    private RecordReader<NullWritable,PhoenixRecordWritable> reader;
     private String contextSignature;
     private ResourceSchema schema;
        
@@ -122,13 +122,13 @@ public final class PhoenixHBaseLoader extends LoadFunc 
implements LoadMetadata {
      * @param configuration
      * @throws PigException
      */
-    private void initializePhoenixPigConfiguration(final String location, 
final Configuration configuration) throws PigException {
+    private void initializePhoenixPigConfiguration(final String location, 
final Configuration configuration) throws IOException {
         if(this.config != null) {
             return;
         }
         this.config = configuration;
         this.config.set(HConstants.ZOOKEEPER_QUORUM,this.zkQuorum);
-        PhoenixConfigurationUtil.setInputClass(this.config, 
PhoenixPigDBWritable.class);
+        PhoenixConfigurationUtil.setInputClass(this.config, 
PhoenixRecordWritable.class);
         Pair<String,String> pair = null;
         try {
             if (location.startsWith(PHOENIX_TABLE_NAME_SCHEME)) {
@@ -167,8 +167,8 @@ public final class PhoenixHBaseLoader extends LoadFunc 
implements LoadMetadata {
     @Override
     public InputFormat getInputFormat() throws IOException {
         if(inputFormat == null) {
-            inputFormat = new PhoenixInputFormat<PhoenixPigDBWritable>();
-            
PhoenixConfigurationUtil.setInputClass(this.config,PhoenixPigDBWritable.class);
+            inputFormat = new PhoenixInputFormat<PhoenixRecordWritable>();
+            PhoenixConfigurationUtil.setInputClass(this.config, 
PhoenixRecordWritable.class);
         }
         return inputFormat;
     }
@@ -198,18 +198,18 @@ public final class PhoenixHBaseLoader extends LoadFunc 
implements LoadMetadata {
             if(!reader.nextKeyValue()) {
                 return null; 
              }
-             final PhoenixPigDBWritable record = reader.getCurrentValue();
+            final PhoenixRecordWritable record = reader.getCurrentValue();
             if(record == null) {
                 return null;
             }
-            final Tuple tuple = 
TypeUtil.transformToTuple(record,schema.getFields());
+            final Tuple tuple = TypeUtil.transformToTuple(record, 
schema.getFields());
             return tuple;
        } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             int errCode = 6018;
             final String errMsg = "Error while reading input";
             throw new ExecException(errMsg, 
errCode,PigException.REMOTE_ENVIRONMENT, e);
-        }
+       } 
     }
     
     private void printUsage(final String location) throws PigException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index 28060f3..a9f0c8f 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -37,16 +37,21 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.pig.util.TableSchemaParserFunction;
-import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
+import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * StoreFunc that uses Phoenix to store data into HBase.
@@ -81,8 +86,10 @@ import org.apache.pig.impl.util.UDFContext;
 @SuppressWarnings("rawtypes")
 public class PhoenixHBaseStorage implements StoreFuncInterface {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixHBaseStorage.class);
+    
     private Configuration config;
-    private RecordWriter<NullWritable, PhoenixPigDBWritable> writer;
+    private RecordWriter<NullWritable, PhoenixRecordWritable> writer;
     private List<ColumnInfo> columnInfo = null;
     private String contextSignature = null;
     private ResourceSchema schema;  
@@ -167,16 +174,28 @@ public class PhoenixHBaseStorage implements 
StoreFuncInterface {
 
     @Override
     public void putNext(Tuple t) throws IOException {
-        ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : 
schema.getFields();      
-        PhoenixPigDBWritable record = 
PhoenixPigDBWritable.newInstance(fieldSchemas,this.columnInfo);
-        for(int i=0; i<t.size(); i++) {
-            record.add(t.get(i));
-        }
+        ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : 
schema.getFields();
+        PhoenixRecordWritable record = new 
PhoenixRecordWritable(this.columnInfo);
         try {
+            for(int i=0; i<t.size(); i++) {
+                Object value = t.get(i);
+                if(value == null) {
+                    record.add(null);
+                    continue;
+                }
+                ColumnInfo cinfo = this.columnInfo.get(i);
+                byte type = (fieldSchemas == null) ? DataType.findType(value) 
: fieldSchemas[i].getType();
+                PDataType pDataType = PDataType.fromTypeId(cinfo.getSqlType());
+                Object v =  TypeUtil.castPigTypeToPhoenix(value, type, 
pDataType);
+                record.add(v);
+            }
             this.writer.write(null, record);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new RuntimeException(e);
+        } catch (SQLException e) {
+            LOG.error("Error on tuple {} .",t);
+            throw new IOException(e);
         }
         
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
index 53d3ae3..8c9bd6a 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -20,14 +20,17 @@ package org.apache.phoenix.pig.util;
 
 import java.io.IOException;
 import java.sql.Date;
+import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PChar;
@@ -53,6 +56,7 @@ import org.apache.phoenix.schema.types.PUnsignedTimestamp;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.schema.types.PhoenixArray;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.hadoop.hbase.HBaseBinaryConverter;
@@ -72,7 +76,7 @@ public final class TypeUtil {
     private static final HBaseBinaryConverter BINARY_CONVERTER = new 
HBaseBinaryConverter();
     private static final ImmutableMap<PDataType, Byte> PHOENIX_TO_PIG_TYPE = 
init();
     private static final TupleFactory TUPLE_FACTORY = 
TupleFactory.getInstance();
-
+    
     private TypeUtil() {}
 
     /**
@@ -157,15 +161,24 @@ public final class TypeUtil {
     }
 
     /**
-     * This method encodes a value with Phoenix data type. It begins with 
checking whether an object is BINARY and makes
+     * This method encodes a value with Phoenix data type. It begins with 
checking whether an object is TUPLE. A {@link Tuple} is mapped
+     * to a {@link PArrayDataType} .  It then checks if it is BINARY and makes
      * a call to {@link #castBytes(Object, PDataType)} to convert bytes to 
targetPhoenixType. It returns a {@link RuntimeException}
      * when object can not be coerced.
      * 
      * @param o
      * @param targetPhoenixType
      * @return Object
+     * @throws SQLException 
      */
-    public static Object castPigTypeToPhoenix(Object o, byte objectType, 
PDataType targetPhoenixType) {
+    public static Object castPigTypeToPhoenix(Object o, byte objectType, 
PDataType targetPhoenixType) throws SQLException {
+        
+        if(DataType.TUPLE == objectType) {
+            Tuple tuple = (Tuple)o;
+            List<Object> data = tuple.getAll();
+            return data.toArray();
+        }
+        
         PDataType inferredPType = getType(o, objectType);
 
         if (inferredPType == null) { return null; }
@@ -237,20 +250,22 @@ public final class TypeUtil {
      * @return
      * @throws IOException
      */
-    public static Tuple transformToTuple(final PhoenixPigDBWritable record, 
final ResourceFieldSchema[] projectedColumns)
+    public static Tuple transformToTuple(final PhoenixRecordWritable record, 
final ResourceFieldSchema[] projectedColumns) 
             throws IOException {
 
-        List<Object> columnValues = record.getValues();
+        Map<String, Object> columnValues = record.getResultMap();
+        
         if (columnValues == null || columnValues.size() == 0 || 
projectedColumns == null
                 || projectedColumns.length != columnValues.size()) { return 
null; }
         int numColumns = columnValues.size();
         Tuple tuple = TUPLE_FACTORY.newTuple(numColumns);
         try {
-            for (int i = 0; i < numColumns; i++) {
+            int i = 0;
+            for (Map.Entry<String,Object> entry : columnValues.entrySet()) {
                 final ResourceFieldSchema fieldSchema = projectedColumns[i];
-                Object object = columnValues.get(i);
+                Object object = entry.getValue();
                 if (object == null) {
-                    tuple.set(i, null);
+                    tuple.set(i++, null);
                     continue;
                 }
 
@@ -289,9 +304,20 @@ public final class TypeUtil {
                 case DataType.BIGINTEGER:
                     tuple.set(i, DataType.toBigInteger(object));
                     break;
+                case DataType.TUPLE:
+                {
+                    PhoenixArray array = (PhoenixArray)object;
+                    Tuple t = TUPLE_FACTORY.newTuple(array.getDimensions());;
+                    for(int j = 0 ; j < array.getDimensions() ; j++) {
+                        t.set(j,array.getElement(j));
+                    }
+                    tuple.set(i, t);
+                    break;
+                }
                 default:
                     throw new RuntimeException(String.format(" Not supported 
[%s] pig type", fieldSchema));
                 }
+                i++;
             }
         } catch (Exception ex) {
             final String errorMsg = String.format(" Error transforming 
PhoenixRecord to Tuple [%s] ", ex.getMessage());
@@ -300,7 +326,7 @@ public final class TypeUtil {
         }
         return tuple;
     }
-
+    
     /**
      * Returns the mapping pig data type for a given phoenix data type.
      * 
@@ -309,6 +335,9 @@ public final class TypeUtil {
      */
     public static Byte getPigDataTypeForPhoenixType(final PDataType 
phoenixDataType) {
         Preconditions.checkNotNull(phoenixDataType);
+        if(phoenixDataType instanceof PArrayDataType) {
+            return DataType.TUPLE;
+        }
         final Byte pigDataType = PHOENIX_TO_PIG_TYPE.get(phoenixDataType);
         if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(" For PhoenixDataType [%s] , pigDataType 
is [%s] ",

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
deleted file mode 100644
index 566e427..0000000
--- 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.writable;
-
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.phoenix.pig.util.TypeUtil;
-import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.data.DataType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A {@link Writable} representing a Phoenix record. This class
- * a) does a type mapping and sets the value accordingly in the {@link 
PreparedStatement}
- * b) reads the column values from the {@link ResultSet}
- * 
- */
-public class PhoenixPigDBWritable implements DBWritable {
-    
-    private final List<Object> values;
-    private ResourceFieldSchema[] fieldSchemas;
-    private List<ColumnInfo> columnMetadataList;
-  
-    public PhoenixPigDBWritable() {
-        this.values = new ArrayList<Object>();
-    }
-    
-    @Override
-    public void write(PreparedStatement statement) throws SQLException {
-        for (int i = 0; i < columnMetadataList.size(); i++) {
-            Object o = values.get(i);
-            ColumnInfo columnInfo = columnMetadataList.get(i);
-            byte type = (fieldSchemas == null) ? DataType.findType(o) : 
fieldSchemas[i].getType();
-            try {
-                Object upsertValue = convertTypeSpecificValue(o, type, 
columnInfo.getSqlType());
-                if (upsertValue != null) {
-                    statement.setObject(i + 1, upsertValue, 
columnInfo.getSqlType());
-                } else {
-                    statement.setNull(i + 1, columnInfo.getSqlType());
-                }
-            } catch (RuntimeException re) {
-                throw new RuntimeException(String.format("Unable to process 
column %s, innerMessage=%s"
-                        ,columnInfo.toString(),re.getMessage()),re);
-                
-            }
-        }
-    }
-    
-    public void add(Object value) {
-        values.add(value);
-    }
-
-    private Object convertTypeSpecificValue(Object o, byte type, Integer 
sqlType) {
-        PDataType pDataType = PDataType.fromTypeId(sqlType);
-        return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
-    }
-
-    public List<Object> getValues() {
-        return values;
-    }
-
-    @Override
-    public void readFields(final ResultSet rs) throws SQLException {
-        Preconditions.checkNotNull(rs);
-        final int noOfColumns = rs.getMetaData().getColumnCount();
-        values.clear();
-        for(int i = 1 ; i <= noOfColumns ; i++) {
-            Object obj = rs.getObject(i);
-            values.add(obj);
-        }
-    }
-
-    public ResourceFieldSchema[] getFieldSchemas() {
-        return fieldSchemas;
-    }
-
-    public void setFieldSchemas(ResourceFieldSchema[] fieldSchemas) {
-        this.fieldSchemas = fieldSchemas;
-    }
-
-    public List<ColumnInfo> getColumnMetadataList() {
-        return columnMetadataList;
-    }
-
-    public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) {
-        this.columnMetadataList = columnMetadataList;
-    }
-
-    public static PhoenixPigDBWritable newInstance(final ResourceFieldSchema[] 
fieldSchemas,
-            final List<ColumnInfo> columnMetadataList) {
-        final PhoenixPigDBWritable dbWritable = new PhoenixPigDBWritable ();
-        dbWritable.setFieldSchemas(fieldSchemas);
-        dbWritable.setColumnMetadataList(columnMetadataList);
-        return dbWritable;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8574d431/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java 
b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java
index 0b44d2b..e459dc1 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/TypeUtilTest.java
@@ -24,44 +24,57 @@ import static org.mockito.Mockito.when;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.util.List;
+import java.util.Map;
 
-import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PhoenixArray;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class TypeUtilTest {
 
     @Test
     public void testTransformToTuple() throws Exception {
-        PhoenixPigDBWritable record = mock(PhoenixPigDBWritable.class);
-        List<Object> values = Lists.newArrayList();
-        values.add("213123");
-        values.add(1231123);
-        values.add(31231231232131L);
-        values.add("bytearray".getBytes());
-        when(record.getValues()).thenReturn(values);
+        PhoenixRecordWritable record = mock(PhoenixRecordWritable.class);
+        Double[] doubleArr =  new Double[2];
+        doubleArr[0] = 64.87;
+        doubleArr[1] = 89.96;
+        PhoenixArray arr = 
PArrayDataType.instantiatePhoenixArray(PDouble.INSTANCE, doubleArr);
+        Map<String,Object> values = Maps.newLinkedHashMap();
+        values.put("first", "213123");
+        values.put("second", 1231123);
+        values.put("third", 31231231232131L);
+        values.put("four", "bytearray".getBytes());
+        values.put("five", arr);
+        when(record.getResultMap()).thenReturn(values);
 
         ResourceFieldSchema field = new 
ResourceFieldSchema().setType(DataType.CHARARRAY);
         ResourceFieldSchema field1 = new 
ResourceFieldSchema().setType(DataType.INTEGER);
         ResourceFieldSchema field2 = new 
ResourceFieldSchema().setType(DataType.LONG);
         ResourceFieldSchema field3 = new 
ResourceFieldSchema().setType(DataType.BYTEARRAY);
-        ResourceFieldSchema[] projectedColumns = { field, field1, field2, 
field3 };
+        ResourceFieldSchema field4 = new 
ResourceFieldSchema().setType(DataType.TUPLE);
+        ResourceFieldSchema[] projectedColumns = { field, field1, field2, 
field3 , field4 }; 
 
         Tuple t = TypeUtil.transformToTuple(record, projectedColumns);
 
         assertEquals(DataType.LONG, DataType.findType(t.get(2)));
+        assertEquals(DataType.TUPLE, DataType.findType(t.get(4)));
+        Tuple doubleArrayTuple = (Tuple)t.get(4);
+        assertEquals(2,doubleArrayTuple.size());
 
         field = new ResourceFieldSchema().setType(DataType.BIGDECIMAL);
         field1 = new ResourceFieldSchema().setType(DataType.BIGINTEGER);
         values.clear();
-        values.add(new BigDecimal(123123123.123213));
-        values.add(new BigInteger("1312313231312"));
+        values.put("first", new BigDecimal(123123123.123213));
+        values.put("second", new BigInteger("1312313231312"));
         ResourceFieldSchema[] columns = { field, field1 };
+        
         t = TypeUtil.transformToTuple(record, columns);
 
         assertEquals(DataType.BIGDECIMAL, DataType.findType(t.get(0)));

Reply via email to