+1 on this.  I'm writing a bunch of LZO-based LoadFuncs/Slicers (commit
coming soon) and it's so much faster to test/verify in local mode.

Btw, I believe there's already a ticket somewhere for this, but while I'm at
it: +1 on further separation of the LoadFunc/pig output from the Slicer/disk
reads.  Right now there are some one-offs in the Pig code to deal with
gzipped file input.  Writing loaders to deal with Lzo (with indexed blocks
that mean shifting the InputSplits slightly) has meant copying a bunch of
the logic for parsing tuples from PigStorage, etc, because the lower layer
isn't as pluggable as it could be.

Kevin

On Mon, Sep 14, 2009 at 8:35 AM, Dmitriy Ryaboy <dvrya...@gmail.com> wrote:

> There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612
>
> Vote it up so that the pig developers have a record of user interest
> in this feature.
>
> -D
>
> On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
> <vincent.ba...@ubikod.com> wrote:
> > It seems that I got my answer: custom loader functions can only be used
> in
> > map reduce mode, not local mode: in local mode, the file specified must
> be a
> > real file.
> >
> > Vincent BARAT a écrit :
> >>
> >> Hello,
> >>
> >> In the process of to trying to add the support for HBase 0.20.0 in PIG
> >> (trunk) I was trying the tutorial from PIG documentation:
> >>
> >> http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
> >>
> >> Unfortunately, when I try:
> >>
> >> A = LOAD '27' USING RangeSlicer();
> >> dump A;
> >>
> >> PIG reports the following error:
> >>
> >> 2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt -
> >> ERROR 2081: Unable to setup the load function.
> >>
> >> If I provide an existing file, instead of '27', I no longer have this
> >> error, but the output of the dump function is empty.
> >>
> >> Any idea ?
> >>
> >>
> >> Here is my RangeSlicer() code:
> >>
> >> =========================================================
> >>
> >>
> >> package com.ubikod.ermin.backend.pigudfs;
> >>
> >> import java.io.IOException;
> >>
> >> import org.apache.commons.logging.Log;
> >> import org.apache.commons.logging.LogFactory;
> >> import org.apache.pig.ExecType;
> >> import org.apache.pig.LoadFunc;
> >> import org.apache.pig.Slice;
> >> import org.apache.pig.Slicer;
> >> import org.apache.pig.backend.datastorage.DataStorage;
> >> import org.apache.pig.builtin.Utf8StorageConverter;
> >> import org.apache.pig.data.Tuple;
> >> import org.apache.pig.impl.io.BufferedPositionedInputStream;
> >> import org.apache.pig.impl.logicalLayer.schema.Schema;
> >>
> >> public class RangeSlicer extends Utf8StorageConverter implements Slicer,
> >>  LoadFunc
> >> {
> >>  private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
> >>
> >>  public RangeSlicer()
> >>  {
> >>    LOG.info("RangeSlicer");
> >>  }
> >>
> >>  /**
> >>   * Expects location to be a Stringified integer, and makes
> >>   * Integer.parseInt(location) slices. Each slice generates a single
> >> value, its
> >>   * index in the sequence of slices.
> >>   */
> >>  public Slice[] slice(DataStorage store, String location) throws
> >> IOException
> >>  {
> >>    LOG.info("slice #################" + location);
> >>    location = "30";
> >>    // Note: validate has already made sure that location is an integer
> >>    int numslices = Integer.parseInt(location);
> >>    LOG.info("slice #################" + numslices);
> >>    Slice[] slices = new Slice[numslices];
> >>    for (int i = 0; i < slices.length; i++)
> >>    {
> >>      slices[i] = new SingleValueSlice(i);
> >>    }
> >>    return slices;
> >>  }
> >>
> >>  public void validate(DataStorage store, String location) throws
> >> IOException
> >>  {
> >>    try
> >>    {
> >>      LOG.info("validate #################" + location);
> >>      Integer.parseInt("30");
> >>      LOG.info("validate #################" + location);
> >>    }
> >>    catch (NumberFormatException nfe)
> >>    {
> >>      throw new IOException(nfe.getMessage());
> >>    }
> >>  }
> >>
> >>  /**
> >>   * A Slice that returns a single value from next.
> >>   */
> >>  public static class SingleValueSlice implements Slice
> >>  {
> >>    // note this value is set by the Slicer and will get serialized and
> >>    // deserialized at the remote processing node
> >>    public int val;
> >>    // since we just have a single value, we can use a boolean rather
> than
> >> a
> >>    // counter
> >>    private transient boolean read;
> >>
> >>    public SingleValueSlice(int value)
> >>    {
> >>      LOG.info("SingleValueSlice #################" + value);
> >>
> >>      this.val = value;
> >>    }
> >>
> >>    public void close() throws IOException
> >>    {
> >>    }
> >>
> >>    public long getLength()
> >>    {
> >>      return 1;
> >>    }
> >>
> >>    public String[] getLocations()
> >>    {
> >>      return new String[0];
> >>    }
> >>
> >>    public long getStart()
> >>    {
> >>      return 0;
> >>    }
> >>
> >>    public long getPos() throws IOException
> >>    {
> >>      return read ? 1 : 0;
> >>    }
> >>
> >>    public float getProgress() throws IOException
> >>    {
> >>      return read ? 1 : 0;
> >>    }
> >>
> >>    public void init(DataStorage store) throws IOException
> >>    {
> >>    }
> >>
> >>    public boolean next(Tuple value) throws IOException
> >>    {
> >>      if (!read)
> >>      {
> >>        LOG.info("next #################" + value);
> >>
> >>        value.append(val);
> >>        read = true;
> >>        return true;
> >>      }
> >>      return false;
> >>    }
> >>
> >>    private static final long serialVersionUID = 1L;
> >>  }
> >>
> >>  @Override
> >>  public void bindTo(String arg0, BufferedPositionedInputStream arg1,
> >>    long arg2, long arg3) throws IOException
> >>  {
> >>    LOG.info("bindTo #################" + arg0);
> >>  }
> >>
> >>  @Override
> >>  public Schema determineSchema(String arg0, ExecType arg1, DataStorage
> >> arg2)
> >>    throws IOException
> >>  {
> >>    // TODO Auto-generated method stub
> >>    return null;
> >>  }
> >>
> >>  @Override
> >>  public void fieldsToRead(Schema arg0)
> >>  {
> >>    // TODO Auto-generated method stub
> >>  }
> >>
> >>  @Override
> >>  public Tuple getNext() throws IOException
> >>  {
> >>    // TODO Auto-generated method stub
> >>    return null;
> >>  }
> >> }
> >
>

Reply via email to