Author: olga Date: Wed Sep 10 13:00:36 2008 New Revision: 693961 URL: http://svn.apache.org/viewvc?rev=693961&view=rev Log: streaming merge
Added: incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java incubator/pig/branches/types/test/org/apache/pig/test/Util.java Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java?rev=693961&r1=693960&r2=693961&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java Wed Sep 10 13:00:36 2008 @@ -182,6 +182,10 @@ public void visit(LOUserFunc op) throws VisitorException { op.setPlan(mCurrentWalker.getPlan()); } + + public void visit(LOStream op) throws VisitorException { + op.setPlan(mCurrentWalker.getPlan()); + } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=693961&r1=693960&r2=693961&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Wed Sep 10 13:00:36 2008 @@ -30,6 +30,9 @@ * An optimizer for logical plans. */ public class LogicalOptimizer extends PlanOptimizer<LogicalOperator, LogicalPlan> { + + public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad"; + public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream"; public LogicalOptimizer(LogicalPlan plan) { super(plan); @@ -53,12 +56,22 @@ // Add type casting to plans where the schema has been declared (by // user, data, or data catalog). nodes = new ArrayList<String>(1); - nodes.add("org.apache.pig.impl.logicalLayer.LOLoad"); + nodes.add(LOLOAD_CLASSNAME); edges = new HashMap<Integer, Integer>(); required = new ArrayList<Boolean>(1); required.add(true); mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, required, - new TypeCastInserter(plan))); + new TypeCastInserter(plan, LOLOAD_CLASSNAME))); + + // Add type casting to plans where the schema has been declared by + // user in a statement with stream operator. + nodes = new ArrayList<String>(1); + nodes.add(LOSTREAM_CLASSNAME); + edges = new HashMap<Integer, Integer>(); + required = new ArrayList<Boolean>(1); + required.add(true); + mRules.add(new Rule(nodes, edges, required, + new TypeCastInserter(plan, LOSTREAM_CLASSNAME))); // Push up limit where ever possible. nodes = new ArrayList<String>(1); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=693961&r1=693960&r2=693961&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Wed Sep 10 13:00:36 2008 @@ -73,7 +73,7 @@ public void transform(List<LogicalOperator> nodes) throws OptimizerException { LogicalOperator lo = nodes.get(0); if (lo == null || !(lo instanceof LOLimit)) { - throw new RuntimeException("Expected load, got " + + throw new RuntimeException("Expected limit, got " + lo.getClass().getName()); } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=693961&r1=693960&r2=693961&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Wed Sep 10 13:00:36 2008 @@ -27,10 +27,9 @@ import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.LOCast; import org.apache.pig.impl.logicalLayer.LOForEach; -import org.apache.pig.impl.logicalLayer.LOGenerate; import org.apache.pig.impl.logicalLayer.LOLoad; import org.apache.pig.impl.logicalLayer.LOProject; -import org.apache.pig.impl.logicalLayer.LOVisitor; +import org.apache.pig.impl.logicalLayer.LOStream; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -38,9 +37,6 @@ import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.optimizer.OptimizerException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * A visitor to discover if any schema has been specified for a file being * loaded. If so, a projection will be injected into the plan to cast the @@ -52,23 +48,17 @@ */ public class TypeCastInserter extends LogicalTransformer { - private static final Log log = LogFactory.getLog(TypeCastInserter.class); + private String operatorClassName; - public TypeCastInserter(LogicalPlan plan) { + public TypeCastInserter(LogicalPlan plan, String operatorClassName) { super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan)); + this.operatorClassName = operatorClassName; } @Override public boolean check(List<LogicalOperator> nodes) throws OptimizerException { try { - LogicalOperator lo = nodes.get(0); - if (lo == null || !(lo instanceof LOLoad)) { - throw new RuntimeException("Expected load, got " + - lo.getClass().getName()); - } - - LOLoad load = (LOLoad)lo; - Schema s = load.getSchema(); + Schema s = getOperator(nodes).getSchema(); if (s == null) return false; boolean sawOne = false; @@ -86,20 +76,36 @@ " check if type casts are needed", fe); } } - - @Override - public void transform(List<LogicalOperator> nodes) throws OptimizerException { - try { - LogicalOperator lo = nodes.get(0); + + private LogicalOperator getOperator(List<LogicalOperator> nodes) throws FrontendException { + LogicalOperator lo = nodes.get(0); + if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) { if (lo == null || !(lo instanceof LOLoad)) { throw new RuntimeException("Expected load, got " + lo.getClass().getName()); } - - LOLoad load = (LOLoad)lo; - Schema s = load.getSchema(); - String scope = load.getOperatorKey().scope; + return lo; + } else if(operatorClassName == LogicalOptimizer.LOSTREAM_CLASSNAME){ + if (lo == null || !(lo instanceof LOStream)) { + throw new RuntimeException("Expected stream, got " + + lo.getClass().getName()); + } + + return lo; + } else { + // we should never be called with any other operator class name + throw new FrontendException("TypeCastInserter invoked with an invalid operator class name:" + operatorClassName); + } + + } + + @Override + public void transform(List<LogicalOperator> nodes) throws OptimizerException { + try { + LogicalOperator lo = getOperator(nodes); + Schema s = lo.getSchema(); + String scope = lo.getOperatorKey().scope; // For every field, build a logical plan. If the field has a type // other than byte array, then the plan will be cast(project). Else // it will just be project. @@ -113,7 +119,7 @@ List<Integer> toProject = new ArrayList<Integer>(1); toProject.add(i); LOProject proj = new LOProject(p, OperatorKey.genOpKey(scope), - load, toProject); + lo, toProject); p.add(proj); Schema.FieldSchema fs = s.getField(i); if (fs.type != DataType.BYTEARRAY) { @@ -136,7 +142,7 @@ OperatorKey.genOpKey(scope), genPlans, flattens); // Insert the foreach into the plan and patch up the plan. - insertAfter(load, foreach, null); + insertAfter(lo, foreach, null); rebuildSchemas(); Added: incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java?rev=693961&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java Wed Sep 10 13:00:36 2008 @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import static org.apache.pig.ExecType.MAPREDUCE; +import static org.apache.pig.ExecType.LOCAL; +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigServer; +import org.apache.pig.ExecType; +import org.junit.After; +import org.junit.Before; + +public abstract class PigExecTestCase extends TestCase { + + protected final Log log = LogFactory.getLog(getClass()); + + protected ExecType execType = MAPREDUCE; + + private MiniCluster cluster; + protected PigServer pigServer; + + @Before + @Override + protected void setUp() throws Exception { + + String execTypeString = System.getProperty("test.exectype"); + if(execTypeString!=null && execTypeString.length()>0){ + execType = PigServer.parseExecType(execTypeString); + } + if(execType == MAPREDUCE) { + cluster = MiniCluster.buildCluster(); + pigServer = new PigServer(MAPREDUCE, cluster.getProperties()); + } else { + pigServer = new PigServer(LOCAL); + } + } + + @After + @Override + protected void tearDown() throws Exception { + pigServer.shutdown(); + } +} Added: incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java?rev=693961&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java Wed Sep 10 13:00:36 2008 @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.pig.ExecType; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.data.DefaultTupleFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.io.FileLocalizer; +import org.junit.Assert; +import org.junit.Test; + +public class TestStreaming extends PigExecTestCase { + + private TupleFactory tf = DefaultTupleFactory.getInstance(); + + private static final String simpleEchoStreamingCommand; + static { + if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) + simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'"; + else + simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'"; + } + + private Tuple[] setupExpectedResults(Object[] firstField, Object[] secondField) throws ExecException { + Assert.assertEquals(firstField.length, secondField.length); + + Tuple[] expectedResults = new Tuple[firstField.length]; + for (int i=0; i < expectedResults.length; ++i) { + expectedResults[i] = tf.newTuple(2); + expectedResults[i].set(0, firstField[i]); + expectedResults[i].set(1, secondField[i]); + } + + return expectedResults; + } + + @Test + public void testSimpleMapSideStreaming() + throws Exception { + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", "D,2", + "A,5", "B,5", "C,8", "A,8", + "D,8", "A,9"}); + + // Expected results + String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"}; + Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; + boolean[] withTypes = {true, false}; + for (int i = 0; i < withTypes.length; i++) { + Tuple[] expectedResults = null; + if(withTypes[i] == true) { + expectedResults = + setupExpectedResults(expectedFirstFields, expectedSecondFields); + } else { + expectedResults = + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + } + + // Pig query to run + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); + pigServer.registerQuery("S1 = stream FILTERED_DATA through `" + + simpleEchoStreamingCommand + "`;"); + if(withTypes[i] == true) { + pigServer.registerQuery("OP = stream S1 through `" + + simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);"); + } else { + pigServer.registerQuery("OP = stream S1 through `" + + simpleEchoStreamingCommand + "`;"); + } + + // Run the query and check the results + Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); + } + } + + @Test + public void testSimpleMapSideStreamingWithOutputSchema() + throws Exception { + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", "D,2", + "A,5", "B,5", "C,8", "A,8", + "D,8", "A,9"}); + + // Expected results + Object[] expectedFirstFields = new String[] {"C", "A", "D", "A"}; + Object[] expectedSecondFields = new Integer[] {8, 8, 8, 9}; + + boolean[] withTypes = {true, false}; + for (int i = 0; i < withTypes.length; i++) { + Tuple[] expectedResults = null; + if(withTypes[i] == true) { + expectedResults = + setupExpectedResults(expectedFirstFields, expectedSecondFields); + } else { + expectedResults = + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + } + // Pig query to run + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); + if(withTypes[i] == true) { + pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" + + simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);"); + } else { + pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" + + simpleEchoStreamingCommand + "` as (f0, f1);"); + } + pigServer.registerQuery("OP = filter STREAMED_DATA by f1 > 6;"); + + // Run the query and check the results + Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); + } + } + + @Test + public void testSimpleReduceSideStreamingAfterFlatten() + throws Exception { + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", "D,2", + "A,5", "B,5", "C,8", "A,8", + "D,8", "A,9"}); + + // Expected results + String[] expectedFirstFields = new String[] {"A", "A", "A", "B", "C", "D"}; + Integer[] expectedSecondFields = new Integer[] {5, 8, 9, 5, 8, 8}; + boolean[] withTypes = {true, false}; + for (int i = 0; i < withTypes.length; i++) { + Tuple[] expectedResults = null; + if(withTypes[i] == true) { + expectedResults = + setupExpectedResults(expectedFirstFields, expectedSecondFields); + } else { + expectedResults = + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + } + + // Pig query to run + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); + pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;"); + pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " + + "generate flatten($1);"); + pigServer.registerQuery("S1 = stream FLATTENED_GROUPED_DATA through `" + + simpleEchoStreamingCommand + "`;"); + if(withTypes[i] == true) { + pigServer.registerQuery("OP = stream S1 through `" + + simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);"); + } else { + pigServer.registerQuery("OP = stream S1 through `" + + simpleEchoStreamingCommand + "`;"); + } + + // Run the query and check the results + Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); + } + } + + @Test + public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception { + File input = Util.createInputFile("tmp", "", + new String[] {"A,1,2,3", "B,2,4,5", + "C,3,1,2", "D,2,5,2", + "A,5,5,1", "B,5,7,4", + "C,8,9,2", "A,8,4,5", + "D,8,8,3", "A,9,2,5"} + ); + + // Expected results + String[] expectedFirstFields = + new String[] {"A", "A", "A", "A", "B", "B", "C", "C", "D", "D"}; + Integer[] expectedSecondFields = new Integer[] {1, 9, 8, 5, 2, 5, 3, 8, 2, 8}; + Integer[] expectedThirdFields = new Integer[] {2, 2, 4, 5, 4, 7, 1, 9, 5, 8}; + Integer[] expectedFourthFields = new Integer[] {3, 5, 5, 1, 5, 4, 2, 2, 2, 3}; + Tuple[] expectedResults = new Tuple[10]; + for (int i = 0; i < expectedResults.length; ++i) { + expectedResults[i] = tf.newTuple(4); + expectedResults[i].set(0, expectedFirstFields[i]); + expectedResults[i].set(1, expectedSecondFields[i]); + expectedResults[i].set(2, expectedThirdFields[i]); + expectedResults[i].set(3, expectedFourthFields[i]); + } + //setupExpectedResults(expectedFirstFields, expectedSecondFields); + + // Pig query to run + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); + pigServer.registerQuery("S1 = stream FILTERED_DATA through `" + + simpleEchoStreamingCommand + "`;"); + pigServer.registerQuery("S2 = stream S1 through `" + + simpleEchoStreamingCommand + "`;"); + pigServer.registerQuery("GROUPED_DATA = group IP by $0;"); + pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " + + " D = order IP BY $2, $3;" + + " generate flatten(D);" + + "};"); + pigServer.registerQuery("S3 = stream ORDERED_DATA through `" + + simpleEchoStreamingCommand + "`;"); + pigServer.registerQuery("OP = stream S3 through `" + + simpleEchoStreamingCommand + "` as (f0:chararray, f1:int, f2:int, f3:int);"); + + // Run the query and check the results + Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); + } + + @Test + public void testInputShipSpecs() throws Exception { + // FIXME : this should be tested in all modes + if(execType == ExecType.LOCAL) + return; + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", + "D,2", "A,5", "B,5", + "C,8", "A,8", "D,8", + "A,9"}); + + // Perl script + String[] script = + new String[] { + "#!/usr/bin/perl", + "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", + "while (<INFILE>) {", + " chomp $_;", + " print STDOUT \"$_\n\";", + " print STDERR \"STDERR: $_\n\";", + "}", + }; + File command1 = Util.createInputFile("script", "pl", script); + File command2 = Util.createInputFile("script", "pl", script); + + // Expected results + String[] expectedFirstFields = + new String[] {"A", "B", "C", "A", "D", "A"}; + Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; + Tuple[] expectedResults = + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + + // Pig query to run + + pigServer.registerQuery( + "define CMD1 `" + command1.getName() + " foo` " + + "ship ('" + Util.encodeEscape(command1.toString()) + "') " + + "input('foo' using " + PigStorage.class.getName() + "(',')) " + + "output(stdout using " + PigStorage.class.getName() + "(',')) " + + "stderr();"); + pigServer.registerQuery( + "define CMD2 `" + command2.getName() + " bar` " + + "ship ('" + Util.encodeEscape(command2.toString()) + "') " + + "input('bar' using " + PigStorage.class.getName() + "(',')) " + + "output(stdout using " + PigStorage.class.getName() + "(',')) " + + "stderr();"); + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); + pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + + "through CMD1;"); + pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;"); + + String output = "/pig/out"; + pigServer.deleteFile(output); + pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); + + InputStream op = FileLocalizer.open(output, pigServer.getPigContext()); + PigStorage ps = new PigStorage(","); + ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + List<Tuple> outputs = new ArrayList<Tuple>(); + Tuple t; + while ((t = ps.getNext()) != null) { + outputs.add(t); + } + + // Run the query and check the results + Util.checkQueryOutputs(outputs.iterator(), expectedResults); + } + + @Test + public void testInputCacheSpecs() throws Exception { + // Can't run this without HDFS + if(execType == ExecType.LOCAL) + return; + + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", + "D,2", "A,5", "B,5", + "C,8", "A,8", "D,8", + "A,9"}); + + // Perl script + String[] script = + new String[] { + "#!/usr/bin/perl", + "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", + "while (<INFILE>) {", + " chomp $_;", + " print STDOUT \"$_\n\";", + " print STDERR \"STDERR: $_\n\";", + "}", + }; + // Copy the scripts to HDFS + File command1 = Util.createInputFile("script", "pl", script); + File command2 = Util.createInputFile("script", "pl", script); + String c1 = FileLocalizer.hadoopify(command1.toString(), + pigServer.getPigContext()); + String c2 = FileLocalizer.hadoopify(command2.toString(), + pigServer.getPigContext()); + + // Expected results + String[] expectedFirstFields = + new String[] {"A", "B", "C", "A", "D", "A"}; + Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; + Tuple[] expectedResults = + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + + // Pig query to run + pigServer.registerQuery( + "define CMD1 `script1.pl foo` " + + "cache ('" + c1 + "#script1.pl') " + + "input('foo' using " + PigStorage.class.getName() + "(',')) " + + "stderr();"); + pigServer.registerQuery( + "define CMD2 `script2.pl bar` " + + "cache ('" + c2 + "#script2.pl') " + + "input('bar' using " + PigStorage.class.getName() + "(',')) " + + "stderr();"); + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); + pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + + "through CMD1;"); + pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;"); + + String output = "/pig/out"; + pigServer.deleteFile(output); + pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); + + InputStream op = FileLocalizer.open(output, pigServer.getPigContext()); + PigStorage ps = new PigStorage(","); + ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + List<Tuple> outputs = new ArrayList<Tuple>(); + Tuple t; + while ((t = ps.getNext()) != null) { + outputs.add(t); + } + + // Run the query and check the results + Util.checkQueryOutputs(outputs.iterator(), expectedResults); + } + + @Test + public void testOutputShipSpecs() throws Exception { + // FIXME : this should be tested in all modes + if(execType == ExecType.LOCAL) + return; + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", + "D,2", "A,5", "B,5", + "C,8", "A,8", "D,8", + "A,9"}); + + // Perl script + String[] script = + new String[] { + "#!/usr/bin/perl", + "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";", + "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";", + "while (<STDIN>) {", + " print OUTFILE \"$_\n\";", + " print STDERR \"STDERR: $_\n\";", + " print OUTFILE2 \"A,10\n\";", + "}", + }; + File command = Util.createInputFile("script", "pl", script); + + // Expected results + String[] expectedFirstFields = + new String[] {"A", "A", "A", "A", "A", "A"}; + Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10}; + Tuple[] expectedResults = + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + + // Pig query to run + pigServer.registerQuery( + "define CMD `" + command.getName() + " foo bar` " + + "ship ('" + Util.encodeEscape(command.toString()) + "') " + + "output('foo' using " + PigStorage.class.getName() + "(','), " + + "'bar' using " + PigStorage.class.getName() + "(',')) " + + "stderr();"); + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); + pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;"); + + String output = "/pig/out"; + pigServer.deleteFile(output); + pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); + + InputStream op = FileLocalizer.open(output+"/bar", + pigServer.getPigContext()); + PigStorage ps = new PigStorage(","); + ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + List<Tuple> outputs = new ArrayList<Tuple>(); + Tuple t; + while ((t = ps.getNext()) != null) { + outputs.add(t); + } + + // Run the query and check the results + Util.checkQueryOutputs(outputs.iterator(), expectedResults); + } + + @Test + public void testInputOutputSpecs() throws Exception { + // FIXME : this should be tested in all modes + if(execType == ExecType.LOCAL) + return; + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", + "D,2", "A,5", "B,5", + "C,8", "A,8", "D,8", + "A,9"}); + + // Perl script + String[] script = + new String[] { + "#!/usr/bin/perl", + "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", + "open(OUTFILE, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";", + "open(OUTFILE2, \">\", $ARGV[2]) or die \"Can't open \".$ARGV[2].\"!: $!\";", + "while (<INFILE>) {", + " chomp $_;", + " print OUTFILE \"$_\n\";", + " print STDERR \"STDERR: $_\n\";", + " print OUTFILE2 \"$_\n\";", + "}", + }; + File command = Util.createInputFile("script", "pl", script); + + // Expected results + String[] expectedFirstFields = + new String[] {"A", "B", "C", "A", "D", "A"}; + Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; + Tuple[] expectedResults = + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + // Pig query to run + pigServer.registerQuery( + "define CMD `" + command.getName() + " foo bar foobar` " + + "ship ('" + Util.encodeEscape(command.toString()) + "') " + + "input('foo' using " + PigStorage.class.getName() + "(',')) " + + "output('bar', " + + "'foobar' using " + PigStorage.class.getName() + "(',')) " + + "stderr();"); + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); + pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;"); + + String output = "/pig/out"; + pigServer.deleteFile(output); + pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); + + InputStream op = FileLocalizer.open(output+"/foobar", + pigServer.getPigContext()); + PigStorage ps = new PigStorage(","); + ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + List<Tuple> outputs = new ArrayList<Tuple>(); + Tuple t; + while ((t = ps.getNext()) != null) { + outputs.add(t); + } + + // Run the query and check the results + Util.checkQueryOutputs(outputs.iterator(), expectedResults); + + // Cleanup + pigServer.deleteFile(output); + } + + @Test + public void testSimpleMapSideStreamingWithUnixPipes() + throws Exception { + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", "D,2", + "A,5", "B,5", "C,8", "A,8", + "D,8", "A,9"}); + + // Expected results + String[] expectedFirstFields = + new String[] {"A", "B", "C", "D", "A", "B", "C", "A", "D", "A"}; + Integer[] expectedSecondFields = new Integer[] {1, 2, 3, 2, 5, 5, 8, 8, 8, 9}; + boolean[] withTypes = {true, false}; + for (int i = 0; i < withTypes.length; i++) { + Tuple[] expectedResults = null; + if(withTypes[i] == true) { + expectedResults = + setupExpectedResults(expectedFirstFields, expectedSecondFields); + } else { + expectedResults = + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + } + + // Pig query to run + pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand + + " | " + simpleEchoStreamingCommand + "`;"); + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',');"); + if(withTypes[i] == true) { + pigServer.registerQuery("OP = stream IP through CMD as (f0:chararray, f1:int);"); + } else { + pigServer.registerQuery("OP = stream IP through CMD;"); + } + + // Run the query and check the results + Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); + } + } + + @Test + public void testLocalNegativeLoadStoreOptimization() throws Exception { + testNegativeLoadStoreOptimization(ExecType.LOCAL); + } + + @Test + public void testMRNegativeLoadStoreOptimization() throws Exception { + testNegativeLoadStoreOptimization(ExecType.MAPREDUCE); + } + + private void testNegativeLoadStoreOptimization(ExecType execType) + throws Exception { + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", "D,2", + "A,5", "B,5", "C,8", "A,8", + "D,8", "A,9"}); + + // Expected results + String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"}; + Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; + boolean[] withTypes = {true, false}; + for (int i = 0; i < withTypes.length; i++) { + Tuple[] expectedResults = null; + if(withTypes[i] == true) { + expectedResults = + setupExpectedResults(expectedFirstFields, expectedSecondFields); + } else { + expectedResults = + setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields)); + } + + // Pig query to run + pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand + + "` input(stdin using PigDump);"); + pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',') " + + "split by 'file';"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); + if(withTypes[i] == true) { + pigServer.registerQuery("OP = stream FILTERED_DATA through `" + + simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);"); + } else { + pigServer.registerQuery("OP = stream FILTERED_DATA through `" + + simpleEchoStreamingCommand + "`;"); + } + + // Run the query and check the results + Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); + } + } +} Modified: incubator/pig/branches/types/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/Util.java?rev=693961&r1=693960&r2=693961&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/Util.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/Util.java Wed Sep 10 13:00:36 2008 @@ -122,10 +122,10 @@ return m; } - static public DataByteArray[] toDataByteArrays(String[] input) { + static public<T> DataByteArray[] toDataByteArrays(T[] input) { DataByteArray[] dbas = new DataByteArray[input.length]; for (int i = 0; i < input.length; i++) { - dbas[i] = (input[i] == null)?null:new DataByteArray(input[i].getBytes()); + dbas[i] = (input[i] == null)?null:new DataByteArray(input[i].toString().getBytes()); } return dbas; } @@ -185,7 +185,7 @@ for (Tuple expected : expectedResults) { Tuple actual = actualResults.next(); - Assert.assertEquals(expected.toString(), actual.toString()); + Assert.assertEquals(expected, actual); } }