Author: hashutosh Date: Tue Mar 16 17:09:16 2010 New Revision: 923872 URL: http://svn.apache.org/viewvc?rev=923872&view=rev Log: PIG-1292: Interface Refinements (hashutosh)
Added: hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Mar 16 17:09:16 2010 @@ -22,6 +22,8 @@ Trunk (unreleased changes) INCOMPATIBLE CHANGES +PIG-1292: Interface Refinements (hashutosh) + PIG-1259: ResourceFieldSchema.setSchema should not allow a bag field without a Tuple as its only sub field (the tuple itself can have a schema with > 1 subfields) (pradeepkth) Added: hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java?rev=923872&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java Tue Mar 16 17:09:16 2010 @@ -0,0 +1,21 @@ +package org.apache.pig; + +import java.io.IOException; + +/** + * This interface implemented by {...@link LoadFunc} implementations indicates to + * Pig that it has the capability to load data such that all instances of a key + * will occur in same split. + * @since Pig 0.7 + */ +public interface CollectableLoadFunc { + + /** + * When this method is called, Pig is communicating to Loader that it must + * load data such that all instances of a key are in same split. Pig will + * make no further checks at runtime to ensure whether contract is honored + * or not. + * @throws IOException + */ + public void ensureAllKeyInstancesInSameSplit() throws IOException; +} Modified: hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java Tue Mar 16 17:09:16 2010 @@ -19,22 +19,22 @@ package org.apache.pig; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; /** * This class provides an implementation of OrderedLoadFunc interface * which can be optionally re-used by LoadFuncs that use FileInputFormat, by * having this as a super class */ -public abstract class FileInputLoadFunc extends OrderedLoadFunc { +public abstract class FileInputLoadFunc extends LoadFunc implements OrderedLoadFunc { @Override - public WritableComparable<?> getSplitComparable(PigSplit split) + public WritableComparable<?> getSplitComparable(InputSplit split) throws IOException{ FileSplit fileSplit = null; - if(split.getWrappedSplit() instanceof FileSplit){ - fileSplit = (FileSplit)split.getWrappedSplit(); + if(split instanceof FileSplit){ + fileSplit = (FileSplit)split; }else{ throw new RuntimeException("LoadFunc expected split of type FileSplit"); } @@ -46,5 +46,3 @@ public abstract class FileInputLoadFunc } } - - Modified: hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java Tue Mar 16 17:09:16 2010 @@ -31,15 +31,15 @@ import org.apache.pig.data.Tuple; * needs to perform the merge based join. * * The sequence of calls made from the pig runtime are: - * {...@link IndexableLoadFunc#setUDFContextSignature(String)} + * {...@link LoadFunc#setUDFContextSignature(String)} * {...@link IndexableLoadFunc#initialize(Configuration)} - * {...@link IndexableLoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)} + * {...@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)} * {...@link IndexableLoadFunc#seekNear(Tuple)} * A series of IndexableLoadFunc.getNext(); calls to perform the join * IndexableLoadFunc.close(); * */ -public abstract class IndexableLoadFunc extends LoadFunc { +public interface IndexableLoadFunc { /** * This method is called by pig run time to allow the Modified: hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java Tue Mar 16 17:09:16 2010 @@ -21,7 +21,8 @@ package org.apache.pig; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.hadoop.mapreduce.InputSplit; + /** * Implementing this interface indicates to Pig that a given loader @@ -29,8 +30,9 @@ import org.apache.pig.backend.hadoop.exe * WritableComparable object is stored in the index created by * MergeJoin sampling MR job to get an ordered sequence of splits. * This is necessary when the sort key spans multiple splits. + * @since Pig 0.7 */ -public abstract class OrderedLoadFunc extends LoadFunc { +public interface OrderedLoadFunc { /** * The WritableComparable object returned will be used to compare @@ -39,7 +41,7 @@ public abstract class OrderedLoadFunc ex * @return WritableComparable representing the position of the split in input * @throws IOException */ - public abstract WritableComparable<?> getSplitComparable(PigSplit split) + public WritableComparable<?> getSplitComparable(InputSplit split) throws IOException; } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Mar 16 17:09:16 2010 @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapred.JobConf; +import org.apache.pig.CollectableLoadFunc; import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; @@ -906,18 +907,61 @@ public class MRCompiler extends PhyPlanV @Override public void visitCollectedGroup(POCollectedGroup op) throws VisitorException { - try{ - nonBlocking(op); - List<PhysicalPlan> plans = op.getPlans(); - if(plans!=null) - for(PhysicalPlan ep : plans) - addUDFs(ep); - phyToMROpMap.put(op, curMROp); - }catch(Exception e){ - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new MRCompilerException(msg, errCode, PigException.BUG, e); + + if(!curMROp.mapDone){ + + List<PhysicalOperator> roots = curMROp.mapPlan.getRoots(); + if(roots.size() != 1){ + int errCode = 2171; + String errMsg = "Expected one but found more then one root physical operator in physical plan."; + throw new MRCompilerException(errMsg,errCode,PigException.BUG); + } + + PhysicalOperator phyOp = roots.get(0); + if(! (phyOp instanceof POLoad)){ + int errCode = 2172; + String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName(); + throw new MRCompilerException(errMsg,errCode,PigException.BUG); + } + + POLoad loader = (POLoad)phyOp; + LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(loader.getLFile().getFuncSpec()); + try { + if(!(loadFunc instanceof CollectableLoadFunc)){ + throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc."); + } + ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit(); + } catch (MRCompilerException e){ + throw (e); + } catch (IOException e) { + int errCode = 2034; + String msg = "Error compiling operator " + op.getClass().getSimpleName(); + throw new MRCompilerException(msg, errCode, PigException.BUG, e); + } + + try{ + nonBlocking(op); + List<PhysicalPlan> plans = op.getPlans(); + if(plans!=null) + for(PhysicalPlan ep : plans) + addUDFs(ep); + phyToMROpMap.put(op, curMROp); + }catch(Exception e){ + int errCode = 2034; + String msg = "Error compiling operator " + op.getClass().getSimpleName(); + throw new MRCompilerException(msg, errCode, PigException.BUG, e); + } + } + else if(!curMROp.reduceDone){ + String msg = "Blocking operators are not allowed before Collected Group. Consider dropping using 'collected'."; + throw new MRCompilerException(msg, PigException.BUG); + } + else{ + int errCode = 2022; + String msg = "Both map and reduce phases have been done. This is unexpected while compiling."; + throw new MRCompilerException(msg, errCode, PigException.BUG); } + } @Override Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java Tue Mar 16 17:09:16 2010 @@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.util.List; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -57,7 +56,7 @@ public class MergeJoinIndexer extends L private PhysicalOperator rightPipelineLeaf; private PhysicalOperator rightPipelineRoot; private Tuple dummyTuple = null; - private OrderedLoadFunc loader; + private LoadFunc loader; private PigSplit pigSplit = null; /** @param funcSpec : Loader specification. @@ -70,7 +69,7 @@ public class MergeJoinIndexer extends L @SuppressWarnings("unchecked") public MergeJoinIndexer(String funcSpec, String innerPlan, String serializedPhyPlan) throws ExecException{ - loader = (OrderedLoadFunc)PigContext.instantiateFuncFromSpec(funcSpec); + loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec); try { List<PhysicalPlan> innerPlans = (List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan); lr = new POLocalRearrange(new OperatorKey("MergeJoin Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer"))); @@ -101,7 +100,7 @@ public class MergeJoinIndexer extends L if(!firstRec) // We sample only one record per block. return null; - WritableComparable<?> position = loader.getSplitComparable(pigSplit); + WritableComparable<?> position = ((OrderedLoadFunc)loader).getSplitComparable(pigSplit.getWrappedSplit()); Object key = null; Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Tue Mar 16 17:09:16 2010 @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.mapreduce.Job; import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; +import org.apache.pig.LoadFunc; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; @@ -69,7 +70,7 @@ public class POMergeJoin extends Physica //The Local Rearrange operators modeling the join key private POLocalRearrange[] LRs; - private transient IndexableLoadFunc rightLoader; + private transient LoadFunc rightLoader; private OperatorKey opKey; private Object prevLeftKey; @@ -245,7 +246,7 @@ public class POMergeJoin extends Physica else{ // This is end of all input and this is last join output. // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself. try { - rightLoader.close(); + ((IndexableLoadFunc)rightLoader).close(); } catch (IOException e) { // Non-fatal error. We can continue. log.error("Received exception while trying to close right side file: " + e.getMessage()); @@ -377,7 +378,7 @@ public class POMergeJoin extends Physica if(this.parentPlan.endOfAllInput){ // This is end of all input and this is last time we will read right input. // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself. try { - rightLoader.close(); + ((IndexableLoadFunc)rightLoader).close(); } catch (IOException e) { // Non-fatal error. We can continue. log.error("Received exception while trying to close right side file: " + e.getMessage()); @@ -389,7 +390,7 @@ public class POMergeJoin extends Physica } private void seekInRightStream(Object firstLeftKey) throws IOException{ - rightLoader = (IndexableLoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec); + rightLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec); // check if hadoop distributed cache is used if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) { @@ -399,12 +400,11 @@ public class POMergeJoin extends Physica // Pass signature of the loader to rightLoader // make a copy of the conf to use in calls to rightLoader. - Configuration conf = new Configuration(PigMapReduce.sJobConf); rightLoader.setUDFContextSignature(signature); - Job job = new Job(conf); + Job job = new Job(new Configuration(PigMapReduce.sJobConf)); rightLoader.setLocation(rightInputFileName, job); - rightLoader.initialize(job.getConfiguration()); - rightLoader.seekNear( + ((IndexableLoadFunc)rightLoader).initialize(job.getConfiguration()); + ((IndexableLoadFunc)rightLoader).seekNear( firstLeftKey instanceof Tuple ? (Tuple)firstLeftKey : mTupleFactory.newTuple(firstLeftKey)); } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Tue Mar 16 17:09:16 2010 @@ -57,7 +57,7 @@ import org.apache.pig.impl.util.ObjectSe * finds the splitIndex that can contain the key and initializes ReadToEndLoader * to read from that splitIndex onwards , in the sequence of splits in the index */ -public class DefaultIndexableLoader extends IndexableLoadFunc { +public class DefaultIndexableLoader extends LoadFunc implements IndexableLoadFunc{ // FileSpec of index file which will be read from HDFS. Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Tue Mar 16 17:09:16 2010 @@ -24,9 +24,8 @@ import java.util.List; import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.pig.EvalFunc; +import org.apache.pig.CollectableLoadFunc; import org.apache.pig.ExecType; -import org.apache.pig.FuncSpec; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.BagFactory; @@ -34,8 +33,11 @@ import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.test.utils.LogicalPlanTester; import org.apache.pig.test.utils.TestHelper; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.LOCogroup; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.plan.OperatorKey; @@ -79,6 +81,20 @@ public class TestCollectedGroup extends Util.deleteFile(cluster, INPUT_FILE); } + public void testNonCollectableLoader() throws Exception{ + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); + LogicalPlan lp = lpt.buildPlan("B = group A by id using 'collected';"); + PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties()); + pc.connect(); + try { + Util.buildMRPlan(Util.buildPhysicalPlan(lp, pc),pc); + fail("Must throw MRCompiler Exception"); + } catch (Exception e) { + assertTrue(e instanceof MRCompilerException); + } + } + public void testCollectedGrpSpecifiedInSingleQuotes1(){ LogicalPlanTester lpt = new LogicalPlanTester(); @@ -149,8 +165,7 @@ public class TestCollectedGroup extends public void testMapsideGroupByOneColumn() throws IOException{ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); - pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); - + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);"); try { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); DataBag dbshj = BagFactory.getInstance().newDefaultBag(); @@ -176,6 +191,7 @@ public class TestCollectedGroup extends Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); } catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); } } @@ -183,8 +199,8 @@ public class TestCollectedGroup extends public void testMapsideGroupByMultipleColumns() throws IOException{ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); - pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); - + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);"); + try { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); DataBag dbshj = BagFactory.getInstance().newDefaultBag(); @@ -210,6 +226,7 @@ public class TestCollectedGroup extends Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); } catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); } } @@ -217,8 +234,8 @@ public class TestCollectedGroup extends public void testMapsideGroupByStar() throws IOException{ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); - pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);"); - + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);"); + try { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); DataBag dbshj = BagFactory.getInstance().newDefaultBag(); @@ -244,8 +261,16 @@ public class TestCollectedGroup extends Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); } catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); } } + public static class DummyCollectableLoader extends PigStorage implements CollectableLoadFunc{ + + @Override + public void ensureAllKeyInstancesInSameSplit() throws IOException { + } + + } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=923872&r1=923871&r2=923872&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Tue Mar 16 17:09:16 2010 @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.Recor import org.apache.pig.ExecType; import org.apache.pig.IndexableLoadFunc; import org.apache.pig.LoadCaster; +import org.apache.pig.LoadFunc; import org.apache.pig.PigException; import org.apache.pig.PigServer; import org.apache.pig.backend.datastorage.DataStorage; @@ -593,7 +594,7 @@ public class TestMergeJoin { * that expressions are not allowed as merge join keys when the right input's * loader implements {...@link IndexableLoadFunc} */ - public static class DummyIndexableLoader extends IndexableLoadFunc { + public static class DummyIndexableLoader extends LoadFunc implements IndexableLoadFunc{ /** *