Kevin,

Please take a look at the proposal for reworking load and store functions that was posted a couple of days ago and see if it will address your issues with plugability of load functions.

http://wiki.apache.org/pig/LoadStoreRedesignProposal

Alan.

On Sep 14, 2009, at 8:58 AM, Kevin Weil wrote:

+1 on this. I'm writing a bunch of LZO-based LoadFuncs/Slicers (commit
coming soon) and it's so much faster to test/verify in local mode.

Btw, I believe there's already a ticket somewhere for this, but while I'm at it: +1 on further separation of the LoadFunc/pig output from the Slicer/disk
reads.  Right now there are some one-offs in the Pig code to deal with
gzipped file input. Writing loaders to deal with Lzo (with indexed blocks that mean shifting the InputSplits slightly) has meant copying a bunch of the logic for parsing tuples from PigStorage, etc, because the lower layer
isn't as pluggable as it could be.

Kevin

On Mon, Sep 14, 2009 at 8:35 AM, Dmitriy Ryaboy <dvrya...@gmail.com> wrote:

There's a ticket for that: https://issues.apache.org/jira/browse/PIG-612

Vote it up so that the pig developers have a record of user interest
in this feature.

-D

On Mon, Sep 14, 2009 at 10:08 AM, Vincent BARAT
<vincent.ba...@ubikod.com> wrote:
It seems that I got my answer: custom loader functions can only be used
in
map reduce mode, not local mode: in local mode, the file specified must
be a
real file.

Vincent BARAT a écrit :

Hello,

In the process of to trying to add the support for HBase 0.20.0 in PIG
(trunk) I was trying the tutorial from PIG documentation:

http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer

Unfortunately, when I try:

A = LOAD '27' USING RangeSlicer();
dump A;

PIG reports the following error:

2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt -
ERROR 2081: Unable to setup the load function.

If I provide an existing file, instead of '27', I no longer have this
error, but the output of the dump function is empty.

Any idea ?


Here is my RangeSlicer() code:

=========================================================


package com.ubikod.ermin.backend.pigudfs;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.Slice;
import org.apache.pig.Slicer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class RangeSlicer extends Utf8StorageConverter implements Slicer,
LoadFunc
{
private static final Log LOG = LogFactory.getLog(RangeSlicer.class);

public RangeSlicer()
{
  LOG.info("RangeSlicer");
}

/**
 * Expects location to be a Stringified integer, and makes
 * Integer.parseInt(location) slices. Each slice generates a single
value, its
 * index in the sequence of slices.
 */
public Slice[] slice(DataStorage store, String location) throws
IOException
{
  LOG.info("slice #################" + location);
  location = "30";
// Note: validate has already made sure that location is an integer
  int numslices = Integer.parseInt(location);
  LOG.info("slice #################" + numslices);
  Slice[] slices = new Slice[numslices];
  for (int i = 0; i < slices.length; i++)
  {
    slices[i] = new SingleValueSlice(i);
  }
  return slices;
}

public void validate(DataStorage store, String location) throws
IOException
{
  try
  {
    LOG.info("validate #################" + location);
    Integer.parseInt("30");
    LOG.info("validate #################" + location);
  }
  catch (NumberFormatException nfe)
  {
    throw new IOException(nfe.getMessage());
  }
}

/**
 * A Slice that returns a single value from next.
 */
public static class SingleValueSlice implements Slice
{
// note this value is set by the Slicer and will get serialized and
  // deserialized at the remote processing node
  public int val;
  // since we just have a single value, we can use a boolean rather
than
a
  // counter
  private transient boolean read;

  public SingleValueSlice(int value)
  {
    LOG.info("SingleValueSlice #################" + value);

    this.val = value;
  }

  public void close() throws IOException
  {
  }

  public long getLength()
  {
    return 1;
  }

  public String[] getLocations()
  {
    return new String[0];
  }

  public long getStart()
  {
    return 0;
  }

  public long getPos() throws IOException
  {
    return read ? 1 : 0;
  }

  public float getProgress() throws IOException
  {
    return read ? 1 : 0;
  }

  public void init(DataStorage store) throws IOException
  {
  }

  public boolean next(Tuple value) throws IOException
  {
    if (!read)
    {
      LOG.info("next #################" + value);

      value.append(val);
      read = true;
      return true;
    }
    return false;
  }

  private static final long serialVersionUID = 1L;
}

@Override
public void bindTo(String arg0, BufferedPositionedInputStream arg1,
  long arg2, long arg3) throws IOException
{
  LOG.info("bindTo #################" + arg0);
}

@Override
public Schema determineSchema(String arg0, ExecType arg1, DataStorage
arg2)
  throws IOException
{
  // TODO Auto-generated method stub
  return null;
}

@Override
public void fieldsToRead(Schema arg0)
{
  // TODO Auto-generated method stub
}

@Override
public Tuple getNext() throws IOException
{
  // TODO Auto-generated method stub
  return null;
}
}



Reply via email to