Hello,
In the process of to trying to add the support for HBase 0.20.0 in
PIG (trunk) I was trying the tutorial from PIG documentation:
http://hadoop.apache.org/pig/docs/r0.3.0/udf.html#Custom+Slicer
Unfortunately, when I try:
A = LOAD '27' USING RangeSlicer();
dump A;
PIG reports the following error:
2009-09-14 15:33:46,395 [main] ERROR
org.apache.pig.tools.grunt.Grunt - ERROR 2081: Unable to setup the
load function.
If I provide an existing file, instead of '27', I no longer have
this error, but the output of the dump function is empty.
Any idea ?
Here is my RangeSlicer() code:
=========================================================
package com.ubikod.ermin.backend.pigudfs;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.Slice;
import org.apache.pig.Slicer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;
public class RangeSlicer extends Utf8StorageConverter implements Slicer,
LoadFunc
{
private static final Log LOG = LogFactory.getLog(RangeSlicer.class);
public RangeSlicer()
{
LOG.info("RangeSlicer");
}
/**
* Expects location to be a Stringified integer, and makes
* Integer.parseInt(location) slices. Each slice generates a
single value, its
* index in the sequence of slices.
*/
public Slice[] slice(DataStorage store, String location) throws
IOException
{
LOG.info("slice #################" + location);
location = "30";
// Note: validate has already made sure that location is an integer
int numslices = Integer.parseInt(location);
LOG.info("slice #################" + numslices);
Slice[] slices = new Slice[numslices];
for (int i = 0; i < slices.length; i++)
{
slices[i] = new SingleValueSlice(i);
}
return slices;
}
public void validate(DataStorage store, String location) throws
IOException
{
try
{
LOG.info("validate #################" + location);
Integer.parseInt("30");
LOG.info("validate #################" + location);
}
catch (NumberFormatException nfe)
{
throw new IOException(nfe.getMessage());
}
}
/**
* A Slice that returns a single value from next.
*/
public static class SingleValueSlice implements Slice
{
// note this value is set by the Slicer and will get serialized and
// deserialized at the remote processing node
public int val;
// since we just have a single value, we can use a boolean
rather than a
// counter
private transient boolean read;
public SingleValueSlice(int value)
{
LOG.info("SingleValueSlice #################" + value);
this.val = value;
}
public void close() throws IOException
{
}
public long getLength()
{
return 1;
}
public String[] getLocations()
{
return new String[0];
}
public long getStart()
{
return 0;
}
public long getPos() throws IOException
{
return read ? 1 : 0;
}
public float getProgress() throws IOException
{
return read ? 1 : 0;
}
public void init(DataStorage store) throws IOException
{
}
public boolean next(Tuple value) throws IOException
{
if (!read)
{
LOG.info("next #################" + value);
value.append(val);
read = true;
return true;
}
return false;
}
private static final long serialVersionUID = 1L;
}
@Override
public void bindTo(String arg0, BufferedPositionedInputStream arg1,
long arg2, long arg3) throws IOException
{
LOG.info("bindTo #################" + arg0);
}
@Override
public Schema determineSchema(String arg0, ExecType arg1,
DataStorage arg2)
throws IOException
{
// TODO Auto-generated method stub
return null;
}
@Override
public void fieldsToRead(Schema arg0)
{
// TODO Auto-generated method stub
}
@Override
public Tuple getNext() throws IOException
{
// TODO Auto-generated method stub
return null;
}
}