It seems that I got my answer: custom loader functions can only be used in map reduce mode, not local mode: in local mode, the file specified must be a real file.

Vincent BARAT a écrit :
Hello,

In the process of to trying to add the support for HBase 0.20.0 in PIG (trunk) I was trying the tutorial from PIG documentation:

http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer

Unfortunately, when I try:

A = LOAD '27' USING RangeSlicer();
dump A;

PIG reports the following error:

2009-09-14 15:33:46,395 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2081: Unable to setup the load function.

If I provide an existing file, instead of '27', I no longer have this error, but the output of the dump function is empty.

Any idea ?


Here is my RangeSlicer() code:

=========================================================


package com.ubikod.ermin.backend.pigudfs;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.Slice;
import org.apache.pig.Slicer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class RangeSlicer extends Utf8StorageConverter implements Slicer,
  LoadFunc
{
  private static final Log LOG = LogFactory.getLog(RangeSlicer.class);

  public RangeSlicer()
  {
    LOG.info("RangeSlicer");
  }

  /**
   * Expects location to be a Stringified integer, and makes
* Integer.parseInt(location) slices. Each slice generates a single value, its
   * index in the sequence of slices.
   */
public Slice[] slice(DataStorage store, String location) throws IOException
  {
    LOG.info("slice #################" + location);
    location = "30";
    // Note: validate has already made sure that location is an integer
    int numslices = Integer.parseInt(location);
    LOG.info("slice #################" + numslices);
    Slice[] slices = new Slice[numslices];
    for (int i = 0; i < slices.length; i++)
    {
      slices[i] = new SingleValueSlice(i);
    }
    return slices;
  }

public void validate(DataStorage store, String location) throws IOException
  {
    try
    {
      LOG.info("validate #################" + location);
      Integer.parseInt("30");
      LOG.info("validate #################" + location);
    }
    catch (NumberFormatException nfe)
    {
      throw new IOException(nfe.getMessage());
    }
  }

  /**
   * A Slice that returns a single value from next.
   */
  public static class SingleValueSlice implements Slice
  {
    // note this value is set by the Slicer and will get serialized and
    // deserialized at the remote processing node
    public int val;
// since we just have a single value, we can use a boolean rather than a
    // counter
    private transient boolean read;

    public SingleValueSlice(int value)
    {
      LOG.info("SingleValueSlice #################" + value);

      this.val = value;
    }

    public void close() throws IOException
    {
    }

    public long getLength()
    {
      return 1;
    }

    public String[] getLocations()
    {
      return new String[0];
    }

    public long getStart()
    {
      return 0;
    }

    public long getPos() throws IOException
    {
      return read ? 1 : 0;
    }

    public float getProgress() throws IOException
    {
      return read ? 1 : 0;
    }

    public void init(DataStorage store) throws IOException
    {
    }

    public boolean next(Tuple value) throws IOException
    {
      if (!read)
      {
        LOG.info("next #################" + value);

        value.append(val);
        read = true;
        return true;
      }
      return false;
    }

    private static final long serialVersionUID = 1L;
  }

  @Override
  public void bindTo(String arg0, BufferedPositionedInputStream arg1,
    long arg2, long arg3) throws IOException
  {
    LOG.info("bindTo #################" + arg0);
  }

  @Override
public Schema determineSchema(String arg0, ExecType arg1, DataStorage arg2)
    throws IOException
  {
    // TODO Auto-generated method stub
    return null;
  }

  @Override
  public void fieldsToRead(Schema arg0)
  {
    // TODO Auto-generated method stub
  }

  @Override
  public Tuple getNext() throws IOException
  {
    // TODO Auto-generated method stub
    return null;
  }
}

Reply via email to