Author: gates Date: Wed May 7 16:18:45 2008 New Revision: 654320 URL: http://svn.apache.org/viewvc?rev=654320&view=rev Log: Added equals and merge to schema class. Work done by Pi.
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java Modified: incubator/pig/branches/types/build.xml incubator/pig/branches/types/src/org/apache/pig/data/DataType.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Modified: incubator/pig/branches/types/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=654320&r1=654319&r2=654320&view=diff ============================================================================== --- incubator/pig/branches/types/build.xml (original) +++ incubator/pig/branches/types/build.xml Wed May 7 16:18:45 2008 @@ -144,6 +144,7 @@ **/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java, **/test/TestLocalRearrange.java, **/test/TestPOUserFunc.java, **/test/TestPODistinct.java, **/test/TestPOSort.java, + **/test/TestSchema.java, **/test/FakeFSInputStream.java, **/test/Util.java, **/logicalLayer/*.java, **/logicalLayer/parser/NodeIdGenerator.java, **/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java, @@ -263,6 +264,7 @@ <include name="**/TestForEach.java" /> <include name="**/TestInputOutputFileValidator.java" /> <include name="**/TestTypeCheckingValidator.java" /> + <include name="**/TestSchema.java" /> <!-- <include name="**/*Test*.java" /> <exclude name="**/TestLargeFile.java" /> Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=654320&r1=654319&r2=654320&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Wed May 7 16:18:45 2008 @@ -572,4 +572,5 @@ default :return true ; } } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=654320&r1=654319&r2=654320&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Wed May 7 16:18:45 2008 @@ -78,6 +78,44 @@ return true; } + + /*** + * Compare two field schema for equality + * @param fschema + * @param fother + * @param relaxInner If true, we don't check inner tuple schemas + * @param relaxAlias If true, we don't check aliases + * @return + */ + public static boolean equals(FieldSchema fschema, + FieldSchema fother, + boolean relaxInner, + boolean relaxAlias) { + if (fschema == null) { + return false ; + } + + if (fother == null) { + return false ; + } + + if (fschema.type != fother.type) { + return false ; + } + + if ( (!relaxAlias) && (fschema.alias != fother.alias) ) { + return false ; + } + + if ( (!relaxInner) && (fschema.type == DataType.TUPLE) ) { + // compare recursively using schema + if (!Schema.equals(fschema.schema, fother.schema, false, relaxAlias)) { + return false ; + } + } + + return true ; + } } private List<FieldSchema> mFields; @@ -181,6 +219,196 @@ } return true; } + + /** + * Recursively compare two schemas for equality + * @param schema + * @param other + * @param relaxInner if true, inner schemas will not be checked + * @return + */ + public static boolean equals(Schema schema, + Schema other, + boolean relaxInner, + boolean relaxAlias) { + if (schema == null) { + return false ; + } + + if (other == null) { + return false ; + } + + if (schema.size() != other.size()) return false; + + Iterator<FieldSchema> i = schema.mFields.iterator(); + Iterator<FieldSchema> j = other.mFields.iterator(); + + while (i.hasNext()) { + + FieldSchema myFs = i.next() ; + FieldSchema otherFs = j.next() ; + + if ( (!relaxAlias) && (myFs.alias != otherFs.alias) ) { + return false ; + } + + if (myFs.type != otherFs.type) { + return false ; + } + + if (!relaxInner) { + // Compare recursively using field schema + if (!FieldSchema.equals(myFs, otherFs, false, relaxAlias)) { + return false ; + } + } + + } + return true; + } + + + /*** + * Merge this schema with the other schema + * @param other the other schema to be merged with + * @param otherTakesAliasPrecedence true if aliases from the other + * schema take precedence + * @return the merged schema, null if they are not compatible + */ + public Schema merge(Schema other, boolean otherTakesAliasPrecedence) { + return mergeSchema(this, other, otherTakesAliasPrecedence) ; + } + + /*** + * Recursively merge two schemas + * @param schema the initial schema + * @param other the other schema to be merged with + * @param otherTakesAliasPrecedence true if aliases from the other + * schema take precedence + * @return the merged schema, null if they are not compatible + */ + private Schema mergeSchema(Schema schema, Schema other, + boolean otherTakesAliasPrecedence) { + + if (other == null) { + return null ; + } + + if (schema.size() != other.size()) { + return null ; + } + + List<FieldSchema> outputList = new ArrayList<FieldSchema>() ; + + Iterator<FieldSchema> mylist = schema.mFields.iterator() ; + Iterator<FieldSchema> otherlist = other.mFields.iterator() ; + + while (mylist.hasNext()) { + + FieldSchema myFs = mylist.next() ; + FieldSchema otherFs = otherlist.next() ; + + byte mergedType = mergeType(myFs.type, otherFs.type) ; + // if the types cannot be merged, the schemas cannot be merged + if (mergedType == DataType.ERROR) { + return null ; + } + + String mergedAlias = mergeAlias(myFs.alias, + otherFs.alias, + otherTakesAliasPrecedence) ; + + FieldSchema mergedFs = null ; + if (mergedType != DataType.TUPLE) { + // just normal merge + mergedFs = new FieldSchema(mergedAlias, mergedType) ; + } + else { + // merge inner tuple because both sides are tuples + Schema mergedSubSchema = mergeSchema(myFs.schema, + otherFs.schema, + otherTakesAliasPrecedence) ; + // return null if they cannot be merged + if (mergedSubSchema == null) { + return null ; + } + + mergedFs = new FieldSchema(mergedAlias, mergedSubSchema) ; + + } + outputList.add(mergedFs) ; + } + + return new Schema(outputList) ; + } + + /*** + * Merge two aliases. If one of aliases is null, return the other. + * Otherwise check the precedence condition + * @param alias + * @param other + * @param otherTakesPrecedence + * @return + */ + private String mergeAlias(String alias, String other + ,boolean otherTakesPrecedence) { + if (alias == null) { + return other ; + } + else if (other == null) { + return alias ; + } + else if (otherTakesPrecedence) { + return other ; + } + else { + return alias ; + } + } + + /*** + * Merge types if possible + * @param type1 + * @param type2 + * @return the merged type, or DataType.ERROR if not successful + */ + private byte mergeType(byte type1, byte type2) { + // Only legal types can be merged + if ( (!DataType.isUsableType(type1)) || + (!DataType.isUsableType(type2)) ) { + return DataType.ERROR ; + } + + // Same type is OK + if (type1==type2) { + return type1 ; + } + + // Both are number so we return the bigger type + if ( (DataType.isNumberType(type1)) && + (DataType.isNumberType(type2)) ) { + return type1>type2 ? type1:type2 ; + } + + // One is bytearray and the other is (number or chararray) + if ( (type1 == DataType.BYTEARRAY) && + ( (type2 == DataType.CHARARRAY) || (DataType.isNumberType(type2)) ) + ) { + return type2 ; + } + + if ( (type2 == DataType.BYTEARRAY) && + ( (type1 == DataType.CHARARRAY) || (DataType.isNumberType(type1)) ) + ) { + return type1 ; + } + + // else return just ERROR + return DataType.ERROR ; + } + + } Added: incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java?rev=654320&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java Wed May 7 16:18:45 2008 @@ -0,0 +1,113 @@ +package org.apache.pig.test; + +import java.util.* ; + +import org.apache.pig.data.* ; +import org.apache.pig.impl.logicalLayer.schema.* ; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; + +import org.junit.* ; + +import junit.framework.Assert; +import junit.framework.TestCase ; + +public class TestSchema extends TestCase { + + @Test + public void testSchemaEqual1() { + + List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ; + innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ; + innerList1.add(new FieldSchema("11b", DataType.LONG)) ; + + List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ; + innerList2.add(new FieldSchema("11a", DataType.INTEGER)) ; + innerList2.add(new FieldSchema("11b", DataType.LONG)) ; + + Schema innerSchema1 = new Schema(innerList1) ; + Schema innerSchema2 = new Schema(innerList2) ; + + List<FieldSchema> list1 = new ArrayList<FieldSchema>() ; + list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ; + list1.add(new FieldSchema("1b", innerSchema1)) ; + list1.add(new FieldSchema("1c", DataType.INTEGER)) ; + + List<FieldSchema> list2 = new ArrayList<FieldSchema>() ; + list2.add(new FieldSchema("1a", DataType.BYTEARRAY)) ; + list2.add(new FieldSchema("1b", innerSchema2)) ; + list2.add(new FieldSchema("1c", DataType.INTEGER)) ; + + Schema schema1 = new Schema(list1) ; + Schema schema2 = new Schema(list2) ; + + Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ; + + innerList2.get(1).alias = "pi" ; + + Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ; + Assert.assertTrue(Schema.equals(schema1, schema2, false, true)) ; + + innerList2.get(1).alias = "11b" ; + innerList2.get(1).type = DataType.BYTEARRAY ; + + Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ; + Assert.assertTrue(Schema.equals(schema1, schema2, true, false)) ; + + innerList2.get(1).type = DataType.LONG ; + + Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ; + + list2.get(0).type = DataType.CHARARRAY ; + Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ; + } + + @Test + public void testMerge1() { + + // Generate two schemas + List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ; + innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ; + innerList1.add(new FieldSchema("11b", DataType.FLOAT)) ; + + List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ; + innerList2.add(new FieldSchema("22a", DataType.DOUBLE)) ; + innerList2.add(new FieldSchema(null, DataType.LONG)) ; + + Schema innerSchema1 = new Schema(innerList1) ; + Schema innerSchema2 = new Schema(innerList2) ; + + List<FieldSchema> list1 = new ArrayList<FieldSchema>() ; + list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ; + list1.add(new FieldSchema("1b", innerSchema1)) ; + list1.add(new FieldSchema("1c", DataType.LONG)) ; + + List<FieldSchema> list2 = new ArrayList<FieldSchema>() ; + list2.add(new FieldSchema("2a", DataType.BYTEARRAY)) ; + list2.add(new FieldSchema("2b", innerSchema2)) ; + list2.add(new FieldSchema("2c", DataType.INTEGER)) ; + + Schema schema1 = new Schema(list1) ; + Schema schema2 = new Schema(list2) ; + + // Merge + Schema mergedSchema = schema1.merge(schema2, true) ; + + + // Generate expected schema + List<FieldSchema> expectedInnerList = new ArrayList<FieldSchema>() ; + expectedInnerList.add(new FieldSchema("22a", DataType.DOUBLE)) ; + expectedInnerList.add(new FieldSchema("11b", DataType.FLOAT)) ; + + Schema expectedInner = new Schema(expectedInnerList) ; + + List<FieldSchema> expectedList = new ArrayList<FieldSchema>() ; + expectedList.add(new FieldSchema("2a", DataType.BYTEARRAY)) ; + expectedList.add(new FieldSchema("2b", expectedInner)) ; + expectedList.add(new FieldSchema("2c", DataType.LONG)) ; + + Schema expected = new Schema(expectedList) ; + + // Compare + Assert.assertTrue(Schema.equals(mergedSchema, expected, false, false)) ; + } +}