Hi all, I'm trying to build a Cascading tap for Cassandra. Cascading is a layer on top of Hadoop. For this purpose I use ColumnFamilyInputFormat and ColumnFamilyRecordReader from Cassandra.
I ran into a problem that the record reader would create an endless iterator because something goes wrong with the starttoken of the batches the ColumnFamilyRecordReader gets out of Cassandra. In this comment on an issue Jira this situation is explained:https://issues.apache.org/jira/browse/CASSANDRA-4229 The reply on the issue that the behavior is caused by some keys of a row being modified. The suggested solution is to copy all the bytebuffers that are used. I have added ByteBufferUtil.clone liberally, put the problem persists. Any suggestions on what might be causing this? Below the two files that make up the Cascading tap, these use the ColumnFamilyInputFormat and ColumnFamilyRecordReader from Cassandra version 1.1.2: CassandraScheme.java package cascalog.cassandra; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import cascading.flow.FlowProcess; import cascading.scheme.Scheme; import cascading.scheme.SinkCall; import cascading.scheme.SourceCall; import cascading.tap.Tap; import cascading.tuple.Fields; import cascading.tuple.Tuple; import cascading.tuple.TupleEntry; import cascading.util.Util; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.fs.Path; import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.ArrayList; import java.util.HashSet; import java.util.SortedMap; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; import java.nio.ByteBuffer; import org.apache.cassandra.db.IColumn; public class CassandraScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> { private String pathUUID; private String host; private String port; private String keyspace; private String columnFamily; private List<String> columnFieldNames; public CassandraScheme(String host, String port, String keyspace, String columnFamily, List<String> columnFieldNames) { this.host = host; this.port = port; this.keyspace = keyspace; this.columnFamily = columnFamily; this.columnFieldNames = columnFieldNames; this.pathUUID = UUID.randomUUID().toString(); //setSourceFields(new Fields("text3")); // default is unknown //setSinkFields } @Override public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { ByteBuffer key = ByteBufferUtil.clone((ByteBuffer)sourceCall.getInput().createKey()); SortedMap<ByteBuffer, IColumn> value = (SortedMap<ByteBuffer, IColumn>)sourceCall.getInput().createValue(); Object[] obj = new Object[]{key, value}; sourceCall.setContext(obj); } @Override public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) { sourceCall.setContext(null); } @Override public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) throws IOException { Tuple result = new Tuple(); Object key = sourceCall.getContext()[0]; Object value = sourceCall.getContext()[1]; boolean hasNext = sourceCall.getInput().next(key, value); if (!hasNext) { return false; } ByteBuffer orgkey = (ByteBuffer)key; ByteBuffer rowkey = ByteBufferUtil.clone(orgkey); SortedMap<ByteBuffer, IColumn> columns = (SortedMap<ByteBuffer, IColumn>) value; String rowkey_str = ByteBufferUtil.string(rowkey); result.add(rowkey_str); for (String columnFieldName: columnFieldNames) { IColumn col = columns.get(ByteBufferUtil.bytes(columnFieldName)); if (col != null) { result.add(ByteBufferUtil.string(ByteBufferUtil.clone(col.value()))); } else { result.add(null); } } sourceCall.getIncomingEntry().setTuple(result); return true; } @Override public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException { System.out.println("sink"); TupleEntry tupleEntry = sinkCall.getOutgoingEntry(); OutputCollector outputCollector = sinkCall.getOutput(); throw new UnsupportedOperationException("TODO"); //outputCollector.collect(null, put); } @Override public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { System.out.println("sinkConfInit"); } @Override public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) { FileInputFormat.addInputPaths(conf, getPath().toString()); conf.setInputFormat(ColumnFamilyInputFormat.class); ConfigHelper.setRangeBatchSize(conf, 100); ConfigHelper.setInputSplitSize(conf, 30); ConfigHelper.setInputRpcPort(conf, port); ConfigHelper.setInputInitialAddress(conf, host); ConfigHelper.setInputPartitioner(conf, "RandomPartitioner"); ConfigHelper.setInputColumnFamily(conf, keyspace, columnFamily); List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>(); for (String columnFieldName: columnFieldNames) { columnNames.add(ByteBufferUtil.bytes(columnFieldName)); } SlicePredicate predicate = new SlicePredicate().setColumn_names(columnNames); ConfigHelper.setInputSlicePredicate(conf, predicate); } public Path getPath() { return new Path(pathUUID); } public String getIdentifier(){ return host + "_" + port + "_" + keyspace + "_" + columnFamily; } @Override public boolean equals( Object other ) { if( this == other ) return true; if( !( other instanceof CassandraScheme ) ) return false; if( !super.equals( other ) ) return false; CassandraScheme that = (CassandraScheme) other; if(!getPath().toString().equals(that.getPath().toString())) return false; return true; } @Override public int hashCode() { int result = super.hashCode(); result = 31 * result + getPath().toString().hashCode(); result = 31 * result + ( host != null ? host.hashCode() : 0 ); result = 31 * result + ( port != null ? port.hashCode() : 0 ); result = 31 * result + ( keyspace != null ? keyspace.hashCode() : 0 ); result = 31 * result + ( columnFamily != null ? columnFamily.hashCode() : 0 ); return result; } } CassandraTap.java package cascalog.cassandra; import cascading.tap.Tap; import cascading.scheme.Scheme; import cascading.tap.SinkMode; import cascading.flow.FlowProcess; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.OutputCollector; import cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator; import cascading.tuple.TupleEntryIterator; import cascading.tuple.TupleEntrySchemeIterator; import cascading.tuple.TupleEntryCollector; import java.io.IOException; public class CassandraTap extends Tap<JobConf, RecordReader, OutputCollector> { public final String id = "TEMP_ID"; public CassandraScheme scheme; public CassandraTap(CassandraScheme scheme) { super(scheme); this.scheme = scheme; } @Override public String getIdentifier() { return id + "_" + scheme.getIdentifier(); } @Override public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException { return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this , recordReader); } @Override public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException { throw new UnsupportedOperationException("TODO"); } @Override public boolean createResource(JobConf jobConf) throws IOException { // TODO return true; } @Override public boolean deleteResource(JobConf jobConf) throws IOException { // TODO return true; } @Override public boolean resourceExists(JobConf jobConf) throws IOException { // TODO check if column-family exists return true; } @Override public long getModifiedTime(JobConf jobConf) throws IOException { // TODO could read this from tables return System.currentTimeMillis(); // currently unable to find last mod time // on a table } @Override public boolean isSource() { return true; } @Override public boolean isSink() { return false; } @Override public boolean equals(Object other) { if( this == other ) return true; if( !( other instanceof CassandraTap ) ) return false; if( !super.equals( other ) ) return false; CassandraTap otherTap = (CassandraTap) other; if (!otherTap.getIdentifier().equals(getIdentifier())) return false; return true; } @Override public int hashCode(){ int result = super.hashCode(); result = 31 * result + getIdentifier().hashCode(); return result; } }