Author: pradeepkth Date: Fri Jan 29 19:40:17 2010 New Revision: 904607 URL: http://svn.apache.org/viewvc?rev=904607&view=rev Log: PIG-1090: additional commit to enable storeFunc interacting with metadata systems (daijy via pradeepkth)
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java?rev=904607&r1=904606&r2=904607&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java Fri Jan 29 19:40:17 2010 @@ -73,7 +73,12 @@ * <b>store A into 'bla'</b> * then 'bla' is the location. This location should be either a file name * or a URI. If it does not have a URI scheme Pig will assume it is a - * filename. This will be called multiple times during execution on the backend. + * filename. + * This method will be called in the frontend and backend multiple times. Implementations + * should bear in mind that this method is called multiple times and should + * ensure there are no inconsistent side effects due to the multiple calls. + * + * @param location Location indicated in store statement. * @param job The {...@link Job} object * @throws IOException if the location is not valid. Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=904607&r1=904606&r2=904607&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Jan 29 19:40:17 2010 @@ -446,6 +446,7 @@ StoreFunc sFunc = st.getStoreFunc(); if (st.getSchema()!=null) sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo())); + sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob); } for (POStore st: reduceStores) { @@ -453,6 +454,7 @@ StoreFunc sFunc = st.getStoreFunc(); if (st.getSchema()!=null) sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo())); + sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob); } // the OutputFormat we report to Hadoop is always PigOutputFormat Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=904607&r1=904606&r2=904607&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOStore.java Fri Jan 29 19:40:17 2010 @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.pig.FuncSpec; import org.apache.pig.StoreFunc; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; @@ -74,7 +75,7 @@ try { mStoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec()); this.mAlias = alias; - this.signature = constructSignature(mAlias, mOutputFile.getFileName(), mOutputFile.getFuncSpec().getCtorArgs()); + this.signature = constructSignature(mAlias, outputFileSpec.getFileName(), mOutputFile.getFuncSpec()); mStoreFunc.setStoreFuncUDFContextSignature(this.signature); } catch (Exception e) { IOException ioe = new IOException(e.getMessage()); @@ -83,16 +84,8 @@ } } - private String constructSignature(String alias, String filename, String[] args) { - String s = alias+"_"+filename+"_"; - if (args!=null) { - for (int i=0;i<args.length;i++) { - s = s+args[i]; - if (i!=args.length-1) - s = s+"_"; - } - } - return s; + public static String constructSignature(String alias, String filename, FuncSpec funcSpec) { + return alias+"_"+filename+"_"+funcSpec.toString(); } public FileSpec getOutputFile() { @@ -219,9 +212,8 @@ @Override public void setAlias(String newAlias) { super.setAlias(newAlias); - mStoreFunc.setStoreFuncUDFContextSignature(getAlias()+"_"+mOutputFile.getFileName()); - signature = constructSignature(mAlias, mOutputFile.getFileName(), mOutputFile.getFuncSpec().getCtorArgs()); - signature = getAlias()+"_"+mOutputFile.getFileName(); + signature = constructSignature(mAlias, mOutputFile.getFileName(), mOutputFile.getFuncSpec()); + mStoreFunc.setStoreFuncUDFContextSignature(signature); } public String getSignature() { Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=904607&r1=904606&r2=904607&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Jan 29 19:40:17 2010 @@ -619,7 +619,10 @@ } return true; } - + + static String constructFileNameSignature(String fileName, FuncSpec funcSpec) { + return fileName+"_"+funcSpec.toString(); + } } @@ -1270,10 +1273,10 @@ // so translating 'foo' to its absolute path based on that dir would give incorrect // results - hence we store the translations into a map during the first parse for // use during the reparse. - String absolutePath = fileNameMap.get(filename); + String absolutePath = fileNameMap.get(constructFileNameSignature(filename, funcSpec)); if (absolutePath == null) { absolutePath = loFunc.relativeToAbsolutePath(filename, getCurrentDir(pigContext)); - fileNameMap.put(filename, absolutePath); + fileNameMap.put(constructFileNameSignature(filename, funcSpec), absolutePath); } lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(absolutePath, funcSpec), ConfigurationUtil.toConfiguration(pigContext.getProperties())); @@ -2411,12 +2414,13 @@ Object obj = PigContext.instantiateFuncFromSpec(funcSpec); StoreFunc stoFunc = (StoreFunc)obj; + stoFunc.setStoreFuncUDFContextSignature(LOStore.constructSignature(t.image, fileName, funcSpec)); // see the comments in LoadClause for reasons to cache absolutePath - String absolutePath = fileNameMap.get(fileName); + String absolutePath = fileNameMap.get(constructFileNameSignature(fileName, funcSpec)); if (absolutePath == null) { absolutePath = stoFunc.relToAbsPathForStoreLocation(fileName, getCurrentDir(pigContext)); - fileNameMap.put(fileName, absolutePath); + fileNameMap.put(constructFileNameSignature(fileName, funcSpec), absolutePath); } LogicalOperator store = new LOStore(lp, new OperatorKey(scope, getNextId()), new FileSpec(absolutePath, funcSpec), t.image); @@ -3954,4 +3958,4 @@ log.trace("Exiting AliasFieldOrSpec"); return item; } -} +} \ No newline at end of file