Author: gates Date: Thu Aug 28 16:44:13 2008 New Revision: 690049 URL: http://svn.apache.org/viewvc?rev=690049&view=rev Log: PIG-400 Fix issues with flatten and schema naming.
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=690049&r1=690048&r2=690049&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Thu Aug 28 16:44:13 2008 @@ -72,7 +72,6 @@ @Override public Result getNext(Tuple t) throws ExecException { - log.info("inputsAccumulated: " + inputsAccumulated); if (!inputsAccumulated) { Result in = processInput(); distinctBag = BagFactory.getInstance().newDistinctBag(); @@ -84,11 +83,9 @@ continue; } distinctBag.add((Tuple) in.result); - log.info("Added tuple" + in.result + " to the distinct bag"); in = processInput(); } inputsAccumulated = true; - log.info("Distinct bag: " + distinctBag); } if (it == null) { it = distinctBag.iterator(); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=690049&r1=690048&r2=690049&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Thu Aug 28 16:44:13 2008 @@ -25,10 +25,12 @@ import java.util.Iterator; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException; import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanVisitor; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.logicalLayer.parser.ParseException; import org.apache.pig.data.DataType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,6 +48,7 @@ private ArrayList<LogicalPlan> mForEachPlans; private ArrayList<Boolean> mFlatten; + private ArrayList<Schema> mUserDefinedSchema = null; private static Log log = LogFactory.getLog(LOForEach.class); /** @@ -65,6 +68,16 @@ mFlatten = flattenList; } + public LOForEach(LogicalPlan plan, OperatorKey k, + ArrayList<LogicalPlan> foreachPlans, ArrayList<Boolean> flattenList, + ArrayList<Schema> userDefinedSchemaList) { + + super(plan, k); + mForEachPlans = foreachPlans; + mFlatten = flattenList; + mUserDefinedSchema = userDefinedSchemaList; + } + public ArrayList<LogicalPlan> getForEachPlans() { return mForEachPlans; } @@ -92,6 +105,16 @@ return DataType.BAG ; } + private void updateAliasCount(Map<String, Integer> aliases, String alias) { + if((null == aliases) || (null == alias)) return; + Integer count = aliases.get(alias); + if(null == count) { + aliases.put(alias, 1); + } else { + aliases.put(alias, ++count); + } + } + @Override public Schema getSchema() throws FrontendException { log.debug("Entering getSchema"); @@ -124,6 +147,10 @@ try { planFs = ((ExpressionOperator)op).getFieldSchema(); log.debug("planFs: " + planFs); + Schema userDefinedSchema = null; + if(null != mUserDefinedSchema) { + userDefinedSchema = mUserDefinedSchema.get(planCtr); + } if(null != planFs) { String outerCanonicalAlias = op.getAlias(); if(null == outerCanonicalAlias) { @@ -137,56 +164,108 @@ //flatten(B.(x,y,z)) Schema s = planFs.schema; if(null != s) { - for(Schema.FieldSchema fs: s.getFields()) { + for(int i = 0; i < s.size(); ++i) { + Schema.FieldSchema fs; + try { + fs = s.getField(i); + } catch (ParseException pe) { + throw new FrontendException(pe.getMessage()); + } log.debug("fs: " + fs); - log.debug("fs.alias: " + fs.alias); + if(null != userDefinedSchema) { + Schema.FieldSchema userDefinedFieldSchema; + try { + if(i < userDefinedSchema.size()) { + userDefinedFieldSchema = userDefinedSchema.getField(i); + fs = fs.mergePrefixFieldSchema(userDefinedFieldSchema); + } + } catch (ParseException pe) { + throw new FrontendException(pe.getMessage()); + } catch (SchemaMergeException sme) { + throw new FrontendException(sme.getMessage()); + } + outerCanonicalAlias = null; + } String innerCanonicalAlias = fs.alias; + Schema.FieldSchema newFs; if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) { String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias; - Schema.FieldSchema newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type); + newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type); fss.add(newFs); - Integer count; - count = aliases.get(innerCanonicalAlias); - if(null == count) { - aliases.put(innerCanonicalAlias, 1); - } else { - aliases.put(innerCanonicalAlias, ++count); - } - count = aliases.get(disambiguatorAlias); - if(null == count) { - aliases.put(disambiguatorAlias, 1); - } else { - aliases.put(disambiguatorAlias, ++count); - } - flattenAlias.put(newFs, innerCanonicalAlias); - inverseFlattenAlias.put(innerCanonicalAlias, true); + updateAliasCount(aliases, disambiguatorAlias); //it's fine if there are duplicates //we just need to record if its due to //flattening } else { - Schema.FieldSchema newFs = new Schema.FieldSchema(null, fs.schema, fs.type); + newFs = new Schema.FieldSchema(fs.alias, fs.schema, fs.type); fss.add(newFs); } + updateAliasCount(aliases, innerCanonicalAlias); + flattenAlias.put(newFs, innerCanonicalAlias); + inverseFlattenAlias.put(innerCanonicalAlias, true); } } else { - Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY); - fss.add(newFs); + Schema.FieldSchema newFs; + if(null != userDefinedSchema) { + if(!DataType.isSchemaType(planFs.type)) { + if(userDefinedSchema.size() > 1) { + throw new FrontendException("Schema mismatch. A basic type on flattening cannot have more than one column. User defined schema: " + userDefinedSchema); + } + newFs = new Schema.FieldSchema(null, planFs.type); + try { + newFs = newFs.mergePrefixFieldSchema(userDefinedSchema.getField(0)); + } catch (SchemaMergeException sme) { + throw new FrontendException(sme.getMessage()); + } catch (ParseException pe) { + throw new FrontendException(pe.getMessage()); + } + updateAliasCount(aliases, newFs.alias); + fss.add(newFs); + } else { + for(Schema.FieldSchema ufs: userDefinedSchema.getFields()) { + fss.add(new Schema.FieldSchema(ufs.alias, ufs.schema, ufs.type)); + updateAliasCount(aliases, ufs.alias); + } + } + } else { + if(!DataType.isSchemaType(planFs.type)) { + newFs = new Schema.FieldSchema(null, planFs.type); + } else { + newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY); + } + fss.add(newFs); + } } } else { //just populate the schema with the field schema of the expression operator + //check if the user has defined a schema for the operator; compare the schema + //with that of the expression operator field schema and then add it to the list + if(null != userDefinedSchema) { + try { + planFs = planFs.mergePrefixFieldSchema(userDefinedSchema.getField(0)); + updateAliasCount(aliases, planFs.alias); + } catch (SchemaMergeException sme) { + throw new FrontendException(sme.getMessage()); + } catch (ParseException pe) { + throw new FrontendException(pe.getMessage()); + } + } fss.add(planFs); - if(null != outerCanonicalAlias) { - Integer count = aliases.get(outerCanonicalAlias); - if(null == count) { - aliases.put(outerCanonicalAlias, 1); - } else { - aliases.put(outerCanonicalAlias, ++count); - } - } } } else { //did not get a valid list of field schemas - fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)); + String outerCanonicalAlias = null; + if(null != userDefinedSchema) { + try { + Schema.FieldSchema userDefinedFieldSchema = userDefinedSchema.getField(0); + fss.add(userDefinedFieldSchema); + updateAliasCount(aliases, userDefinedFieldSchema.alias); + } catch (ParseException pe) { + throw new FrontendException(pe.getMessage()); + } + } else { + fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)); + } } } catch (FrontendException fee) { mSchema = null; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=690049&r1=690048&r2=690049&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Thu Aug 28 16:44:13 2008 @@ -43,6 +43,7 @@ //private ArrayList<ExpressionOperator> mProjections; private ArrayList<LogicalPlan> mGeneratePlans; private ArrayList<Boolean> mFlatten; + private ArrayList<Schema> mUserDefinedSchema = null; private static Log log = LogFactory.getLog(LOGenerate.class); /** @@ -64,6 +65,16 @@ mFlatten = flatten; } + public LOGenerate(LogicalPlan plan, OperatorKey key, + ArrayList<LogicalPlan> generatePlans, ArrayList<Boolean> flatten, + ArrayList<Schema> userDefinedSchemaList) { + super(plan, key); + mGeneratePlans = generatePlans; + mFlatten = flatten; + mUserDefinedSchema = userDefinedSchemaList; + } + + /** * * @param plan @@ -94,6 +105,10 @@ return mFlatten; } + public List<Schema> getUserDefinedSchema() { + return mUserDefinedSchema; + } + @Override public String name() { return "Generate " + mKey.scope + "-" + mKey.id; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=690049&r1=690048&r2=690049&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Aug 28 16:44:13 2008 @@ -391,6 +391,26 @@ log.trace("Exiting attachPlan"); } + public static class SchemaUtils { + public static void setSchemaDefaultType(Schema s, byte t) { + if(null == s) return; + for(Schema.FieldSchema fs: s.getFields()) { + setFieldSchemaDefaultType(fs, t); + } + } + + public static void setFieldSchemaDefaultType(Schema.FieldSchema fs, byte t) { + if(null == fs) return; + if(DataType.NULL == fs.type) { + fs.type = t; + } + if(DataType.isSchemaType(fs.type)) { + setSchemaDefaultType(fs.schema, t); + } + } + } + + } @@ -478,7 +498,6 @@ } } - PARSER_END(QueryParser) // Skip all the new lines, tabs and spaces @@ -670,12 +689,12 @@ LogicalOperator Expr(LogicalPlan lp) : { LogicalOperator op; - Schema schema; + Schema schema = null; log.trace("Entering Expr"); } { ( - ( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {op.setSchema(schema);} ] ) + ( op = NestedExpr(lp) [ <AS> "(" schema = TupleSchema() ")" {SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); op.setSchema(schema);} ] ) | op = BaseExpr(lp) ) {log.trace("Exiting Expr"); return op;} @@ -736,7 +755,25 @@ ( ( (<DEFINE> op = DefineClause(lp)) -| (<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema); op.setCanonicalNames(); log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ]) +| (<LOAD> op = LoadClause(lp) + [ <AS> + ( + LOOKAHEAD(2) "(" schema = TupleSchema() ")" + { + SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); + op.setSchema(schema); + op.setCanonicalNames(); + log.debug("Load as schema" + schema); + } + | fs = AtomSchema() + { + schema = new Schema(fs); + op.setSchema(schema); + log.debug("Load as atomschema" + schema); + } + ) + ] + ) | ((<GROUP> | <COGROUP>) op = CogroupClause(lp)) | (<FILTER> op = FilterClause(lp)) | (<LIMIT> op = LimitClause(lp)) @@ -1149,6 +1186,7 @@ ArrayList<LogicalPlan> listPlans = new ArrayList<LogicalPlan>(); LogicalPlan groupByPlan; ArrayList<Boolean> flattenList = new ArrayList<Boolean>(); + ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>(); log.trace("Entering GroupItem"); log.debug("LogicalPlan: " + lp); } @@ -1159,16 +1197,16 @@ ( <BY> ( LOOKAHEAD ( "(" FlattenedGenerateItemList(cgOp.getSchema(), null, groupByPlan, cgOp) ")" ) - ( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList) + ( "(" es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) {listPlans.add(groupByPlan);} ( - "," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList) + "," es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) {listPlans.add(groupByPlan);} )* ")" ) | ( - es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList) + es = FlattenedGenerateItem(cgOp.getSchema(), null, groupByPlan = new LogicalPlan(), cgOp, flattenList, userDefinedSchemaList) {listPlans.add(groupByPlan);} ) ) @@ -1412,6 +1450,7 @@ LOGenerate generate = (LOGenerate)specList.get(specList.size() - 1); List<LogicalPlan> generatePlans = generate.getGeneratePlans(); List<Boolean> flattenList = generate.getFlatten(); + List<Schema> userDefinedSchemaList = generate.getUserDefinedSchema(); /* Generate's nested plans will be translated to foreach's nested plan If generate contains an expression that does not require generate's @@ -1461,7 +1500,7 @@ } resetGenerateState(); - foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), (ArrayList)foreachPlans, (ArrayList)flattenList); + foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), (ArrayList)foreachPlans, (ArrayList)flattenList, (ArrayList) userDefinedSchemaList); try { lp.add(foreach); log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan"); @@ -1802,17 +1841,18 @@ { ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>(); ArrayList<Boolean> flattenList = new ArrayList<Boolean>(); + ArrayList<Schema> userDefinedSchemaList = new ArrayList<Schema>(); ExpressionOperator item; LogicalPlan generatePlan; log.trace("Entering FlattenedGenerateItemList"); } { ( - item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList) + item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList, userDefinedSchemaList) { generatePlans.add(generatePlan); } - ("," item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList) + ("," item = FlattenedGenerateItem(over, specs, generatePlan = new LogicalPlan(), input, flattenList, userDefinedSchemaList) { generatePlans.add(generatePlan); } @@ -1820,7 +1860,7 @@ ) { - LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList); + LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList, userDefinedSchemaList); lp.add(generate); log.debug("Added operator " + generate.getClass().getName() + " to the logical plan"); log.trace("Exiting FlattenedGenerateItemList"); @@ -1829,7 +1869,7 @@ } -ExpressionOperator FlattenedGenerateItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, ArrayList<Boolean> flattenList): +ExpressionOperator FlattenedGenerateItem(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input, ArrayList<Boolean> flattenList, ArrayList<Schema> userDefinedSchemaList): { ExpressionOperator item; Schema schema = null; @@ -1841,13 +1881,16 @@ { ( ( - ( <FLATTEN> "(" item = InfixExpr(over,specs,lp,input) ")" { flatten = true; } + [ <AS> "(" schema = TupleSchema() ")" ] ) -| (item = InfixExpr(over,specs,lp,input)) +| + ( + ( + (item = InfixExpr(over,specs,lp,input)) | ( <STAR> { LOProject project = new LOProject(lp, new OperatorKey(scope, getNextId()), input, -1); @@ -1861,15 +1904,17 @@ log.debug("FGItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp); } ) + ) + [ <AS> fs = FieldSchema() ] ) - [ <AS> (fs = FieldSchema()) ] ) { log.debug("item: " + item.getClass().getName()); if(null != fs) { - item.setFieldSchema(fs); + schema = new Schema(fs); } - flattenList.add(flatten); + flattenList.add(flatten); + userDefinedSchemaList.add(schema); log.trace("Exiting FlattenedGenerateItem"); return item; } @@ -2289,7 +2334,7 @@ Schema.FieldSchema AtomSchema() : { Token t1 = null; - byte type = DataType.BYTEARRAY; + byte type = DataType.NULL; Schema.FieldSchema fs; log.trace("Entering AtomSchema"); } @@ -2316,7 +2361,7 @@ log.trace("Entering SchemaMap"); } { - ( t1 = <IDENTIFIER> ) [":" <MAP>] "[" "]" + ( t1 = <IDENTIFIER> ) [LOOKAHEAD(2) ":" <MAP>| ":"] "[" "]" { if (null != t1) { log.debug("MAP alias " + t1.image); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=690049&r1=690048&r2=690049&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu Aug 28 16:44:13 2008 @@ -335,6 +335,83 @@ } } + /*** + * Recursively prefix merge two schemas + * @param other the other field schema to be merged with + * @return the prefix merged field schema this can be null if one schema is null and + * allowIncompatibleTypes is true + * + * @throws SchemaMergeException if they cannot be merged + */ + + public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs) throws SchemaMergeException { + return mergePrefixFieldSchema(otherFs, true); + } + + /*** + * Recursively prefix merge two schemas + * @param other the other field schema to be merged with + * @param otherTakesAliasPrecedence true if aliases from the other + * field schema take precedence + * @return the prefix merged field schema this can be null if one schema is null and + * allowIncompatibleTypes is true + * + * @throws SchemaMergeException if they cannot be merged + */ + + public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs, + boolean otherTakesAliasPrecedence) + throws SchemaMergeException { + Schema.FieldSchema myFs = this; + Schema.FieldSchema mergedFs = null; + byte mergedType = DataType.NULL; + + if(null == otherFs) { + return myFs; + } + + if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN) + && (otherFs.type == DataType.NULL || otherFs.type == DataType.UNKNOWN)) { + throw new SchemaMergeException("Type mismatch. No useful type for merging. Field Schema: " + myFs + ". Other Field Schema: " + otherFs); + } else if(myFs.type == otherFs.type) { + mergedType = myFs.type; + } else if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN) + && (otherFs.type == DataType.NULL)) { + mergedType = DataType.BYTEARRAY; + } else if ((myFs.type != DataType.NULL && myFs.type != DataType.UNKNOWN) + && (otherFs.type == DataType.NULL)) { + mergedType = myFs.type; + } else { + throw new SchemaMergeException("Type mismatch. Field Schema: " + myFs + ". Other Field Schema: " + otherFs); + } + + String mergedAlias = mergeAlias(myFs.alias, + otherFs.alias, + otherTakesAliasPrecedence) ; + + if (!DataType.isSchemaType(mergedType)) { + // just normal merge + mergedFs = new FieldSchema(mergedAlias, mergedType) ; + } + else { + Schema mergedSubSchema = null; + // merge inner schemas because both sides have schemas + if(null != myFs.schema) { + mergedSubSchema = myFs.schema.mergePrefixSchema(otherFs.schema, + otherTakesAliasPrecedence); + } else { + mergedSubSchema = otherFs.schema; + } + // create the merged field + try { + mergedFs = new FieldSchema(mergedAlias, mergedSubSchema, mergedType) ; + } catch (FrontendException fee) { + throw new SchemaMergeException(fee.getMessage()); + } + } + return mergedFs; + } + } private List<FieldSchema> mFields; @@ -1030,6 +1107,70 @@ return new Schema(outerSchema); } + /*** + * Recursively prefix merge two schemas + * @param other the other schema to be merged with + * @param otherTakesAliasPrecedence true if aliases from the other + * schema take precedence + * @return the prefix merged schema this can be null if one schema is null and + * allowIncompatibleTypes is true + * + * @throws SchemaMergeException if they cannot be merged + */ + + public Schema mergePrefixSchema(Schema other, + boolean otherTakesAliasPrecedence) + throws SchemaMergeException { + Schema schema = this; + + if (other == null) { + return this ; + } + + if (schema.size() < other.size()) { + throw new SchemaMergeException("Schema size mismatch. Other schema size greater than schema size. Schema: " + this + ". Other schema: " + other); + } + + List<FieldSchema> outputList = new ArrayList<FieldSchema>() ; + + List<FieldSchema> mylist = schema.mFields ; + List<FieldSchema> otherlist = other.mFields ; + + // We iterate up to the smaller one's size + int iterateLimit = other.mFields.size(); + + int idx = 0; + for (; idx< iterateLimit ; idx ++) { + + // Just for readability + FieldSchema myFs = mylist.get(idx) ; + FieldSchema otherFs = otherlist.get(idx) ; + + FieldSchema mergedFs = myFs.mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence); + outputList.add(mergedFs) ; + } + // if the first schema has leftover, then append the rest + for(int i=idx; i < mylist.size(); i++) { + + FieldSchema fs = mylist.get(i) ; + + // for non-schema types + if (!DataType.isSchemaType(fs.type)) { + outputList.add(new FieldSchema(fs.alias, fs.type)) ; + } + // for TUPLE & BAG + else { + try { + FieldSchema tmp = new FieldSchema(fs.alias, fs.schema, fs.type) ; + outputList.add(tmp) ; + } catch (FrontendException fee) { + throw new SchemaMergeException(fee.getMessage()); + } + } + } + + return new Schema(outputList) ; + } } Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=690049&r1=690048&r2=690049&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu Aug 28 16:44:13 2008 @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.ByteArrayInputStream; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -56,6 +57,8 @@ import org.apache.pig.impl.logicalLayer.LOPrinter; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.data.DataType; +import org.apache.pig.impl.logicalLayer.parser.QueryParser ; +import org.apache.pig.impl.logicalLayer.parser.ParseException ; public class TestLogicalPlanBuilder extends junit.framework.TestCase { @@ -863,7 +866,7 @@ @Test public void testQuery70() { buildPlan(" a = load 'input1';"); - buildPlan(" b = foreach a generate [10L#'hello', 4.0e-2#10L, 0.5f#(1), 'world'#42, 42#{('guide')}] as mymap;"); + buildPlan(" b = foreach a generate [10L#'hello', 4.0e-2#10L, 0.5f#(1), 'world'#42, 42#{('guide')}] as mymap:map[];"); buildPlan(" c = foreach b generate mymap#10L;"); } @@ -1157,6 +1160,112 @@ } } + @Test + public void testQuery90() throws FrontendException, ParseException { + LogicalPlan lp; + LOForEach foreach; + + buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);"); + buildPlan("b = group a by (name, age);"); + + //the first element in group, i.e., name is renamed as myname + lp = buildPlan("c = foreach b generate flatten(group) as (myname), COUNT(a) as mycount;"); + foreach = (LOForEach) lp.getLeaves().get(0); + assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: chararray, age: int, mycount: long"))); + + //the first and second elements in group, i.e., name and age are renamed as myname and myage + lp = buildPlan("c = foreach b generate flatten(group) as (myname, myage), COUNT(a) as mycount;"); + foreach = (LOForEach) lp.getLeaves().get(0); + assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: chararray, myage: int, mycount: long"))); + + //the schema of group is unchanged + lp = buildPlan("c = foreach b generate flatten(group) as (), COUNT(a) as mycount;"); + foreach = (LOForEach) lp.getLeaves().get(0); + assertTrue(foreach.getSchema().equals(getSchemaFromString("name: chararray, age: int, mycount: long"))); + + //group is renamed as mygroup + lp = buildPlan("c = foreach b generate group as mygroup, COUNT(a) as mycount;"); + foreach = (LOForEach) lp.getLeaves().get(0); + assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long"))); + + //group is renamed as mygroup and the first element is renamed as myname + lp = buildPlan("c = foreach b generate group as mygroup:(myname), COUNT(a) as mycount;"); + foreach = (LOForEach) lp.getLeaves().get(0); + assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: chararray, age: int), mycount: long"))); + + //group is renamed as mygroup and the elements are renamed as myname and myage + lp = buildPlan("c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount;"); + foreach = (LOForEach) lp.getLeaves().get(0); + assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: chararray, myage: int), mycount: long"))); + + //group is renamed to mygroup as the tuple schema is empty + lp = buildPlan("c = foreach b generate group as mygroup:(), COUNT(a) as mycount;"); + foreach = (LOForEach) lp.getLeaves().get(0); + assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long"))); + + /* + //forcing an wrror by having more elements in the fhe schema + lp = buildPlan("c = foreach B generate group as mygroup:(myname, myage, mygpa), COUNT(A) as mycount;"); + lp = buildPlan("c = foreach B generate group as mygroup:(myname: int, myage), COUNT(A) as mycount;"); + lp = buildPlan("c = foreach B generate group as mygroup:(myname, myage: chararray), COUNT(A) as mycount;"); + lp = buildPlan("c = foreach B generate group as mygroup:{t: (myname, myage)}, COUNT(A) as mycount;"); + lp = buildPlan("c = foreach B generate flatten(group) as (myname, myage, mygpa), COUNT(A) as mycount;"); + + foreach = (LOForEach) lp.getLeaves().get(0); + + assertTrue(foreach.getSchema().equals(getSchemaFromString())); + */ + + } + + @Test + public void testQueryFail90() throws FrontendException, ParseException { + buildPlan("a = load 'myfile' as (name:Chararray, age:Int, gpa:Float);"); + buildPlan("b = group a by (name, age);"); + + try { + buildPlan("c = foreach b generate group as mygroup:(myname, myage, mygpa), COUNT(a) as mycount;"); + } catch (AssertionFailedError e) { + assertTrue(e.getMessage().contains("Schema size mismatch")); + } + + try { + buildPlan("c = foreach b generate group as mygroup:(myname: int, myage), COUNT(a) as mycount;"); + } catch (AssertionFailedError e) { + assertTrue(e.getMessage().contains("Type mismatch")); + } + + try { + buildPlan("c = foreach b generate group as mygroup:(myname, myage: chararray), COUNT(a) as mycount;"); + } catch (AssertionFailedError e) { + assertTrue(e.getMessage().contains("Type mismatch")); + } + + try { + buildPlan("c = foreach b generate group as mygroup:{t: (myname, myage)}, COUNT(a) as mycount;"); + } catch (AssertionFailedError e) { + assertTrue(e.getMessage().contains("Type mismatch")); + } + + try { + buildPlan("c = foreach b generate flatten(group) as (myname, myage, mygpa), COUNT(a) as mycount;"); + } catch (AssertionFailedError e) { + assertTrue(e.getMessage().contains("Schema size mismatch")); + } + } + + private Schema getSchemaFromString(String schemaString) throws ParseException { + return getSchemaFromString(schemaString, DataType.BYTEARRAY); + } + + private Schema getSchemaFromString(String schemaString, byte defaultType) throws ParseException { + ByteArrayInputStream stream = new ByteArrayInputStream(schemaString.getBytes()) ; + QueryParser queryParser = new QueryParser(stream) ; + Schema schema = queryParser.TupleSchema() ; + QueryParser.SchemaUtils.setSchemaDefaultType(schema, defaultType); + return schema; + } + private void printPlan(LogicalPlan lp) { LOPrinter graphPrinter = new LOPrinter(System.err, lp); System.err.println("Printing the logical plan"); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=690049&r1=690048&r2=690049&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Thu Aug 28 16:44:13 2008 @@ -2418,7 +2418,7 @@ // check outer schema Schema endResultSchema = foreach1.getSchema() ; - assertEquals(endResultSchema.getField(0).type, DataType.BYTEARRAY) ; + assertEquals(endResultSchema.getField(0).type, DataType.FLOAT) ; assertEquals(endResultSchema.getField(1).type, DataType.LONG) ; } Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java?rev=690049&r1=690048&r2=690049&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java Thu Aug 28 16:44:13 2008 @@ -989,7 +989,7 @@ // check outer schema Schema endResultSchema = foreach1.getSchema() ; - assertEquals(endResultSchema.getField(0).type, DataType.BYTEARRAY) ; + assertEquals(endResultSchema.getField(0).type, DataType.FLOAT) ; assertEquals(endResultSchema.getField(1).type, DataType.DOUBLE) ; } } Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java?rev=690049&r1=690048&r2=690049&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java Thu Aug 28 16:44:13 2008 @@ -180,6 +180,7 @@ Schema schema = null ; try { schema = queryParser.TupleSchema() ; + QueryParser.SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY); // set all the [NoAlias] to null for(int i=0; i < dummyAliasCounter; i++) {