Author: pradeepkth
Date: Fri Dec  4 20:12:48 2009
New Revision: 887338

URL: http://svn.apache.org/viewvc?rev=887338&view=rev
Log:
 PIG-1090:  Update sources to reflect recent changes in load-store interfaces - 
incremental
 commit to address getSchema() functionality in BinStorage (pradeepkth)


Modified:
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
    hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java?rev=887338&r1=887337&r2=887338&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java 
Fri Dec  4 20:12:48 2009
@@ -18,32 +18,151 @@
 
 package org.apache.pig;
 
-import java.util.Map;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
-/**
- *
- */
 public class ResourceSchema {
 
-    int version;
+    /* Array Getters intentionally return mutable arrays instead of copies,
+     * to simplify updates without unnecessary copying.
+     * Setters make a copy of the arrays in order to prevent an array
+     * from being shared by two objects, with modifications in one
+     * accidentally changing the other.
+     */
+    
+    // initializing arrays to empty so we don't have to worry about NPEs
+    // setters won't set to null
+    public ResourceFieldSchema[] fields = new ResourceFieldSchema[0];
+
+    public enum Order { ASCENDING, DESCENDING }
+    public int[] sortKeys = {}; // each entry is an offset into the fields 
array.
+    public Order[] sortKeyOrders = new Order[0];
+    
+    
+    public int version = 0;
 
-    public class ResourceFieldSchema {
+    public static class ResourceFieldSchema {
         public String name;
         
-        /**
-         * byte representing type as enumerated in {...@link DataType}
-         */
-        public byte type; 
+        // values are constants from DataType
+        public byte type;
         
         public String description;
-        public ResourceFieldSchema schema; // nested tuples and bags will have 
their own schema
+
+        // nested tuples and bags will have their own schema
+        public ResourceSchema schema; 
+
+        public ResourceFieldSchema() {
+            
+        }
+        
+        public ResourceFieldSchema(FieldSchema fieldSchema) {
+            type = fieldSchema.type;
+            name = fieldSchema.alias;
+            description = "autogenerated from Pig Field Schema";
+            if (type == DataType.BAG || type == DataType.TUPLE) {
+                schema = new ResourceSchema(fieldSchema.schema);
+            } else {
+                schema = null;
+            }
+        }
+        
+        public String getName() {
+            return name;
+        }
+        public ResourceFieldSchema setName(String name) {
+            this.name = name;
+            return this;
+        }
+        
+        public byte getType() {
+            return type;
+        }
+        public ResourceFieldSchema setType(byte type) {
+            this.type = type;
+            return this;
+        }
+        
+        public String getDescription() {
+            return description;
+        }
+        
+        public ResourceFieldSchema setDescription(String description) {
+            this.description = description;
+            return this;
+        }
+
+        public ResourceSchema getSchema() {
+            return schema;
+        }
+
+        public ResourceFieldSchema setSchema(ResourceSchema schema) {
+            this.schema = schema;
+            return this;
+        }
     }
 
-    public ResourceFieldSchema[] fields;
+
+    public ResourceSchema() {
+        
+    }
+    
+    public ResourceSchema(Schema pigSchema) {
+        List<FieldSchema> pigSchemaFields = pigSchema.getFields();
+        fields = new ResourceFieldSchema[pigSchemaFields.size()];
+        for (int i=0; i<fields.length; i++) {
+            fields[i] = new ResourceFieldSchema(pigSchemaFields.get(i));
+        }
+        
+    }
+    
+    public int getVersion() {
+        return version;
+    }
+    
+    public ResourceSchema setVersion(int version) {
+        this.version = version;
+        return this;
+    }
+    
+    public ResourceFieldSchema[] getFields() {
+        return fields;
+    }
+    
+    public String[] fieldNames() {
+        String[] names = new String[fields.length];
+        for (int i=0; i<fields.length; i++) {
+            names[i] = fields[i].getName();
+        }
+        return names;
+    }
+    
+    public ResourceSchema setFields(ResourceFieldSchema[] fields) {
+        if (fields != null)
+            this.fields = Arrays.copyOf(fields, fields.length);
+        return this;
+    }
+    
+    public int[] getSortKeys() {
+        return sortKeys;
+    }
+    public  ResourceSchema setSortKeys(int[] sortKeys) {
+        if (sortKeys != null)
+            this.sortKeys = Arrays.copyOf(sortKeys, sortKeys.length);
+        return this;
+    }
+    
+    public Order[] getSortKeyOrders() {
+        return sortKeyOrders;
+    }
     
-    enum Order { ASCENDING, DESCENDING }
-    public int[] sortKeys; // each entry is an offset into the fields array.
-    public Order[] sortKeyOrders; 
+    public ResourceSchema setSortKeyOrders(Order[] sortKeyOrders) {
+        if (sortKeyOrders != null) 
+            this.sortKeyOrders = Arrays.copyOf(sortKeyOrders, 
sortKeyOrders.length);
+        return this;
+    } 
 }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=887338&r1=887337&r2=887338&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
 Fri Dec  4 20:12:48 2009
@@ -24,9 +24,11 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -38,23 +40,32 @@
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataReaderWriter;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BinStorageInputFormat;
 import org.apache.pig.impl.io.BinStorageOutputFormat;
 import org.apache.pig.impl.io.BinStorageRecordReader;
 import org.apache.pig.impl.io.BinStorageRecordWriter;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.util.LogUtils;
 
 public class BinStorage extends FileInputLoadFunc 
-implements LoadCaster, StoreFunc {
+implements LoadCaster, StoreFunc, LoadMetadata {
 
     
     public static final int RECORD_1 = 0x01;
@@ -321,6 +332,7 @@
         return new BinStorageInputFormat();
     }
 
+    @Override
     public int hashCode() {
         return 42; 
     }
@@ -365,4 +377,58 @@
             throws IOException {
         return LoadFunc.getAbsolutePath(location, curDir);
     }
+
+    @Override
+    public String[] getPartitionKeys(String location, Configuration conf)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ResourceSchema getSchema(String location, Configuration conf)
+            throws IOException {
+        Properties props = ConfigurationUtil.toProperties(conf);
+        // since local mode now is implemented as hadoop's local mode
+        // we can treat either local or hadoop mode as hadoop mode - hence
+        // we can use HDataStorage and FileLocalizer.openDFSFile below
+        HDataStorage storage = new HDataStorage(props);
+        if (!FileLocalizer.fileExists(location, storage)) {
+            // At compile time in batch mode, the file may not exist
+            // (such as intermediate file). Just return null - the
+            // same way as we would if we did not get a valid record
+            return null;
+        }
+        ReadToEndLoader loader = new ReadToEndLoader(this, conf, location, 0);
+        // get the first record from the input file
+        // and figure out the schema from the data in
+        // the first record
+        Tuple t = loader.getNext();
+        if(t == null) {
+            // we couldn't get a valid record from the input
+            return null;
+        }
+        int numFields = t.size();
+        Schema s = new Schema();
+        for (int i = 0; i < numFields; i++) {
+            try {
+                s.add(DataType.determineFieldSchema(t.get(i)));
+            } catch (Exception e) {
+                int errCode = 2104;
+                String msg = "Error while determining schema of BinStorage 
data.";
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            } 
+        }
+        return new ResourceSchema(s);
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location, Configuration 
conf)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setParitionFilter(OperatorPlan plan) throws IOException {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=887338&r1=887337&r2=887338&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 Fri Dec  4 20:12:48 2009
@@ -850,6 +850,7 @@
      * Make a deep copy of a schema.
      * @throws CloneNotSupportedException
      */
+    @Override
     public Schema clone() throws CloneNotSupportedException {
         Schema s = new Schema();
 
@@ -1620,13 +1621,6 @@
         return new Schema(fsList);
     }
     
-    private static Schema getPigSchema(ResourceFieldSchema rfSchema) 
-    throws FrontendException {
-        return new Schema(new FieldSchema(rfSchema.name, 
-                rfSchema.schema == null ? null : getPigSchema(rfSchema.schema),
-                        rfSchema.type));
-    }
-    
 }
 
 

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=887338&r1=887337&r2=887338&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java 
Fri Dec  4 20:12:48 2009
@@ -32,8 +32,11 @@
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
@@ -45,6 +48,8 @@
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.TestHelper;
@@ -61,6 +66,7 @@
     String inputFileName;
     String outputFileName;
     
+    @Override
     @Before
     public void setUp() throws Exception {
         pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
@@ -70,6 +76,7 @@
         ".txt";
     }
 
+    @Override
     @After
     public void tearDown() throws Exception {
         Util.deleteFile(cluster, inputFileName);
@@ -201,7 +208,41 @@
             ++size;
         }
     }
+    @Test
+    public void testBinStorageGetSchema() throws IOException, ParseException {
+        String input[] = new String[] { "hello\t1\t10.1", "bye\t2\t20.2" };
+        String inputFileName = "testGetSchema-input.txt";
+        String outputFileName = "testGetSchema-output.txt";
+        try {
+            Util.createInputFile(pig.getPigContext(), 
+                    inputFileName, input);
+            String query = "a = load '" + inputFileName + "' as (c:chararray, 
" +
+                    "i:int,d:double);store a into '" + outputFileName + "' 
using " +
+                            "BinStorage();";
+            pig.setBatchOn();
+            Util.registerMultiLineQuery(pig, query);
+            pig.executeBatch();
+            ResourceSchema rs = new BinStorage().getSchema(outputFileName, 
+                    ConfigurationUtil.toConfiguration(pig.getPigContext().
+                            getProperties()));
+            Schema expectedSchema = Util.getSchemaFromString(
+                    "c:chararray,i:int,d:double");
+            Assert.assertTrue("Checking binstorage getSchema output", 
Schema.equals( 
+                    expectedSchema, Schema.getPigSchema(rs), true, true));
+        } finally {
+            Util.deleteFile(pig.getPigContext(), inputFileName);
+            Util.deleteFile(pig.getPigContext(), outputFileName);
+        }
+    }
 
+    private static void randomizeBytes(byte[] data, int offset, int length) {
+        Random random = new Random();
+        for(int i=offset + length - 1; i >= offset; --i) {
+            data[i] = (byte) random.nextInt(256);
+        }
+    }
+
+    
     @Test
     public void testStoreRemoteRel() throws Exception {
         checkStorePath("test","/tmp/test");

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java?rev=887338&r1=887337&r2=887338&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java 
(original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java 
Fri Dec  4 20:12:48 2009
@@ -20,10 +20,8 @@
 import static java.util.regex.Matcher.quoteReplacement;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
@@ -32,7 +30,6 @@
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -41,6 +38,7 @@
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -250,6 +248,11 @@
                                        String[] inputData) 
     throws IOException {
         FileSystem fs = miniCluster.getFileSystem();
+        createInputFile(fs, fileName, inputData);
+    }
+    
+    static public void createInputFile(FileSystem fs, String fileName, 
+            String[] inputData) throws IOException {
         if(fs.exists(new Path(fileName))) {
             throw new IOException("File " + fileName + " already exists on the 
minicluster");
         }
@@ -259,6 +262,7 @@
             pw.println(inputData[i]);
         }
         pw.close();
+
     }
     
     /**
@@ -296,7 +300,15 @@
         fs.delete(new Path(fileName), true);
     }
 
-       /**
+    static public void deleteFile(PigContext pigContext, String fileName) 
+    throws IOException {
+        Configuration conf = ConfigurationUtil.toConfiguration(
+                pigContext.getProperties());
+        FileSystem fs = FileSystem.get(conf);
+        fs.delete(new Path(fileName), true);
+    }
+    
+    /**
         * Helper function to check if the result of a Pig Query is in line 
with 
         * expected results.
         * 
@@ -489,4 +501,17 @@
         }
         return(path.delete());
     }
+
+    /**
+     * @param pigContext
+     * @param fileName
+     * @param input
+     * @throws IOException 
+     */
+    public static void createInputFile(PigContext pigContext,
+            String fileName, String[] input) throws IOException {
+        Configuration conf = ConfigurationUtil.toConfiguration(
+                pigContext.getProperties());
+        createInputFile(FileSystem.get(conf), fileName, input); 
+    }
 }


Reply via email to