[ https://issues.apache.org/jira/browse/CASSANDRA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13128995#comment-13128995 ]
Brandon Williams commented on CASSANDRA-3371: --------------------------------------------- Pete, can you attach this patch as a file? It's difficult to read in the description. > Cassandra inferred schema and actual data don't match > ----------------------------------------------------- > > Key: CASSANDRA-3371 > URL: https://issues.apache.org/jira/browse/CASSANDRA-3371 > Project: Cassandra > Issue Type: Bug > Components: Hadoop > Affects Versions: 0.8.7 > Reporter: Pete Warden > Assignee: Brandon Williams > > It's looking like there may be a mismatch between the schema that's being > reported by the latest CassandraStorage.java, and the data that's actually > returned. Here's an example: > rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage(); > DESCRIBE rows; > rows: {key: chararray,columns: {(name: chararray,value: > bytearray,photo_owner: chararray,value_photo_owner: bytearray,pid: > chararray,value_pid: bytearray,matched_string: > chararray,value_matched_string: bytearray,src_big: chararray,value_src_big: > bytearray,time: chararray,value_time: bytearray,vote_type: > chararray,value_vote_type: bytearray,voter: chararray,value_voter: > bytearray)}} > DUMP rows; > (691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu > Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)}) > getSchema() is reporting the columns as an inner bag of tuples, each of which > contains 16 values. In fact, getNext() seems to return an inner bag > containing 7 tuples, each of which contains two values. > It appears that things got out of sync with this change: > http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083 > See more discussion at: > http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html > Here's a patch I ended up creating for my own use, which gives the results I > need (though it doesn't handle super-columns): > DESCRIBE rows; > rows: {cassandra_key: chararray,photo_owner: bytearray,pid: > bytearray,place_matched_string: bytearray,src_big: bytearray,time: > bytearray,vote_type: bytearray,voter: bytearray} > Index: > contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java > =================================================================== > --- > contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java > (revision 1185044) > +++ > contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java > (working copy) > @@ -26,7 +26,7 @@ > import org.apache.cassandra.db.marshal.IntegerType; > import org.apache.cassandra.db.marshal.TypeParser; > import org.apache.cassandra.thrift.*; > -import org.apache.cassandra.utils.Hex; > +import org.apache.cassandra.utils.FBUtilities; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > > @@ -122,15 +122,15 @@ > assert key != null && cf != null; > > // and wrap it in a tuple > - Tuple tuple = TupleFactory.getInstance().newTuple(2); > + Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1); > ArrayList<Tuple> columns = new ArrayList<Tuple>(); > - tuple.set(0, new DataByteArray(key.array(), > key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); > + int tupleIndex = 0; > + tuple.set(tupleIndex++, new DataByteArray(key.array(), > key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); > for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) > { > - columns.add(columnToTuple(entry.getKey(), entry.getValue(), > cfDef)); > + tuple.set(tupleIndex++, columnToTuple(entry.getKey(), > entry.getValue(), cfDef)); > } > > - tuple.set(1, new DefaultDataBag(columns)); > return tuple; > } > catch (InterruptedException e) > @@ -139,30 +139,22 @@ > } > } > > - private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) > throws IOException > + private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) > throws IOException > { > - Tuple pair = TupleFactory.getInstance().newTuple(2); > List<AbstractType> marshallers = getDefaultMarshallers(cfDef); > Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); > > - setTupleValue(pair, 0, marshallers.get(0).compose(name)); > if (col instanceof Column) > { > // standard > if (validators.get(name) == null) > - setTupleValue(pair, 1, > marshallers.get(1).compose(col.value())); > + return marshallers.get(1).compose(col.value()); > else > - setTupleValue(pair, 1, > validators.get(name).compose(col.value())); > - return pair; > + return validators.get(name).compose(col.value()); > } > > - // super > - ArrayList<Tuple> subcols = new ArrayList<Tuple>(); > - for (IColumn subcol : col.getSubColumns()) > - subcols.add(columnToTuple(subcol.name(), subcol, cfDef)); > - > - pair.set(1, new DefaultDataBag(subcols)); > - return pair; > + // super not currently handled > + return null; > } > > private void setTupleValue(Tuple pair, int position, Object value) > throws ExecException > @@ -312,62 +304,32 @@ > // top-level schema, no type > ResourceSchema schema = new ResourceSchema(); > > + ResourceFieldSchema[] tupleFields = new > ResourceFieldSchema[cfDef.column_metadata.size()+1]; > + int tupleIndex = 0; > + > // add key > ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema(); > - keyFieldSchema.setName("key"); > + keyFieldSchema.setName("cassandra_key"); > keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type > + tupleFields[tupleIndex++] = keyFieldSchema; > > - // will become the bag of tuples > - ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema(); > - bagFieldSchema.setName("columns"); > - bagFieldSchema.setType(DataType.BAG); > - ResourceSchema bagSchema = new ResourceSchema(); > - > - > List<AbstractType> marshallers = getDefaultMarshallers(cfDef); > Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); > - List<ResourceFieldSchema> tupleFields = new > ArrayList<ResourceFieldSchema>(); > > - // default comparator/validator > - ResourceSchema innerTupleSchema = new ResourceSchema(); > - ResourceFieldSchema tupleField = new ResourceFieldSchema(); > - tupleField.setType(DataType.TUPLE); > - tupleField.setSchema(innerTupleSchema); > - > - ResourceFieldSchema colSchema = new ResourceFieldSchema(); > - colSchema.setName("name"); > - colSchema.setType(getPigType(marshallers.get(0))); > - tupleFields.add(colSchema); > - > - ResourceFieldSchema valSchema = new ResourceFieldSchema(); > - AbstractType validator = marshallers.get(1); > - valSchema.setName("value"); > - valSchema.setType(getPigType(validator)); > - tupleFields.add(valSchema); > - > // defined validators/indexes > for (ColumnDef cdef : cfDef.column_metadata) > { > - colSchema = new ResourceFieldSchema(); > - colSchema.setName(new String(cdef.getName())); > - colSchema.setType(getPigType(marshallers.get(0))); > - tupleFields.add(colSchema); > - > - valSchema = new ResourceFieldSchema(); > - validator = validators.get(cdef.getName()); > + ResourceFieldSchema valSchema = new ResourceFieldSchema(); > + AbstractType validator = validators.get(cdef.getName()); > if (validator == null) > validator = marshallers.get(1); > - valSchema.setName("value"); > + valSchema.setName(new String(cdef.getName())); > valSchema.setType(getPigType(validator)); > - tupleFields.add(valSchema); > + tupleFields[tupleIndex++] = valSchema; > } > - innerTupleSchema.setFields(tupleFields.toArray(new > ResourceFieldSchema[tupleFields.size()])); > > - // a bag can contain only one tuple, but that tuple can contain > anything > - bagSchema.setFields(new ResourceFieldSchema[] { tupleField }); > - bagFieldSchema.setSchema(bagSchema); > // top level schema contains everything > - schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, > bagFieldSchema }); > + schema.setFields(tupleFields); > return schema; > } > > @@ -601,7 +563,7 @@ > TSerializer serializer = new TSerializer(new > TBinaryProtocol.Factory()); > try > { > - return Hex.bytesToHex(serializer.serialize(cfDef)); > + return FBUtilities.bytesToHex(serializer.serialize(cfDef)); > } > catch (TException e) > { > @@ -616,7 +578,7 @@ > CfDef cfDef = new CfDef(); > try > { > - deserializer.deserialize(cfDef, Hex.hexToBytes(st)); > + deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st)); > } > catch (TException e) > { -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira