+1 on this. I'm writing a bunch of LZO-based LoadFuncs/Slicers (commit coming soon) and it's so much faster to test/verify in local mode.
Btw, I believe there's already a ticket somewhere for this, but while I'm at it: +1 on further separation of the LoadFunc/pig output from the Slicer/disk reads. Right now there are some one-offs in the Pig code to deal with gzipped file input. Writing loaders to deal with Lzo (with indexed blocks that mean shifting the InputSplits slightly) has meant copying a bunch of the logic for parsing tuples from PigStorage, etc, because the lower layer isn't as pluggable as it could be. Kevin On Mon, Sep 14, 2009 at 8:35 AM, Dmitriy Ryaboy <dvrya...@gmail.com> wrote: > There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612 > > Vote it up so that the pig developers have a record of user interest > in this feature. > > -D > > On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT > <vincent.ba...@ubikod.com> wrote: > > It seems that I got my answer: custom loader functions can only be used > in > > map reduce mode, not local mode: in local mode, the file specified must > be a > > real file. > > > > Vincent BARAT a écrit : > >> > >> Hello, > >> > >> In the process of to trying to add the support for HBase 0.20.0 in PIG > >> (trunk) I was trying the tutorial from PIG documentation: > >> > >> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer > >> > >> Unfortunately, when I try: > >> > >> A = LOAD '27' USING RangeSlicer(); > >> dump A; > >> > >> PIG reports the following error: > >> > >> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt - > >> ERROR 2081: Unable to setup the load function. > >> > >> If I provide an existing file, instead of '27', I no longer have this > >> error, but the output of the dump function is empty. > >> > >> Any idea ? > >> > >> > >> Here is my RangeSlicer() code: > >> > >> ========================================================= > >> > >> > >> package com.ubikod.ermin.backend.pigudfs; > >> > >> import java.io.IOException; > >> > >> import org.apache.commons.logging.Log; > >> import org.apache.commons.logging.LogFactory; > >> import org.apache.pig.ExecType; > >> import org.apache.pig.LoadFunc; > >> import org.apache.pig.Slice; > >> import org.apache.pig.Slicer; > >> import org.apache.pig.backend.datastorage.DataStorage; > >> import org.apache.pig.builtin.Utf8StorageConverter; > >> import org.apache.pig.data.Tuple; > >> import org.apache.pig.impl.io.BufferedPositionedInputStream; > >> import org.apache.pig.impl.logicalLayer.schema.Schema; > >> > >> public class RangeSlicer extends Utf8StorageConverter implements Slicer, > >> LoadFunc > >> { > >> private static final Log LOG = LogFactory.getLog(RangeSlicer.class); > >> > >> public RangeSlicer() > >> { > >> LOG.info("RangeSlicer"); > >> } > >> > >> /** > >> * Expects location to be a Stringified integer, and makes > >> * Integer.parseInt(location) slices. Each slice generates a single > >> value, its > >> * index in the sequence of slices. > >> */ > >> public Slice[] slice(DataStorage store, String location) throws > >> IOException > >> { > >> LOG.info("slice #################" + location); > >> location = "30"; > >> // Note: validate has already made sure that location is an integer > >> int numslices = Integer.parseInt(location); > >> LOG.info("slice #################" + numslices); > >> Slice[] slices = new Slice[numslices]; > >> for (int i = 0; i < slices.length; i++) > >> { > >> slices[i] = new SingleValueSlice(i); > >> } > >> return slices; > >> } > >> > >> public void validate(DataStorage store, String location) throws > >> IOException > >> { > >> try > >> { > >> LOG.info("validate #################" + location); > >> Integer.parseInt("30"); > >> LOG.info("validate #################" + location); > >> } > >> catch (NumberFormatException nfe) > >> { > >> throw new IOException(nfe.getMessage()); > >> } > >> } > >> > >> /** > >> * A Slice that returns a single value from next. > >> */ > >> public static class SingleValueSlice implements Slice > >> { > >> // note this value is set by the Slicer and will get serialized and > >> // deserialized at the remote processing node > >> public int val; > >> // since we just have a single value, we can use a boolean rather > than > >> a > >> // counter > >> private transient boolean read; > >> > >> public SingleValueSlice(int value) > >> { > >> LOG.info("SingleValueSlice #################" + value); > >> > >> this.val = value; > >> } > >> > >> public void close() throws IOException > >> { > >> } > >> > >> public long getLength() > >> { > >> return 1; > >> } > >> > >> public String[] getLocations() > >> { > >> return new String[0]; > >> } > >> > >> public long getStart() > >> { > >> return 0; > >> } > >> > >> public long getPos() throws IOException > >> { > >> return read ? 1 : 0; > >> } > >> > >> public float getProgress() throws IOException > >> { > >> return read ? 1 : 0; > >> } > >> > >> public void init(DataStorage store) throws IOException > >> { > >> } > >> > >> public boolean next(Tuple value) throws IOException > >> { > >> if (!read) > >> { > >> LOG.info("next #################" + value); > >> > >> value.append(val); > >> read = true; > >> return true; > >> } > >> return false; > >> } > >> > >> private static final long serialVersionUID = 1L; > >> } > >> > >> @Override > >> public void bindTo(String arg0, BufferedPositionedInputStream arg1, > >> long arg2, long arg3) throws IOException > >> { > >> LOG.info("bindTo #################" + arg0); > >> } > >> > >> @Override > >> public Schema determineSchema(String arg0, ExecType arg1, DataStorage > >> arg2) > >> throws IOException > >> { > >> // TODO Auto-generated method stub > >> return null; > >> } > >> > >> @Override > >> public void fieldsToRead(Schema arg0) > >> { > >> // TODO Auto-generated method stub > >> } > >> > >> @Override > >> public Tuple getNext() throws IOException > >> { > >> // TODO Auto-generated method stub > >> return null; > >> } > >> } > > >