[ https://issues.apache.org/jira/browse/CASSANDRA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Brandon Williams updated CASSANDRA-3371: ---------------------------------------- Attachment: 3371-v3.txt I had what I thought would be a good idea to accomodate this: I'd alias the tuples themselves after the index names, allowing you to do something like this: {noformat} album_votes = FILTER all_votes BY (columns.vote_type.value EQ 'album_like') OR (columns.vote_type.value EQ 'album_dislike'); {noformat} It's not that easy, however. When you dereference a bag, it automatically dereferences the tuple inside it (programmatically, a bag can only have one tuple, but that tuple can contain anything, and thus the automatic deref to mask this from the user.) Unfortunately, it appears to also deref any other tuples inside as well, so you end up in a situation where 'columns.$0' returns the column name, and weird aliasing issues like 'columns.<first index name>' also returns the column name and the second index returns the value, with the rest being null. To get around this, I thought I'd nest each tuple inside another bag. This of course results in a crazy looking schema: {noformat} votes: {key: bytearray,columns: {(matched_string: {(name: chararray,value: chararray)},photo_owner: {(name: chararray,value: chararray)},pid: {(name: chararray,value: chararray)},src_big: {(name: chararray,value: chararray)},time: {(name: chararray,value: chararray)},vote_type: {(name: chararray,value: chararray)},voter: {(name: chararray,value: chararray)})}} {noformat} but if it still derefs as elegantly, it won't be too bad. The problem here is that it still can't deref correctly, you end up with nonsensical parsing errors such as: {noformat} ERROR 1200: Pig script failed to parse: <file photo.pig, line 5, column 8> pig script failed to validate: org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1128: Cannot find field name in matched_string:bag{:tuple(name:chararray,value:chararray) {noformat} even though the 'name' alias is clearly there and 'matched_string' was already being deref'd. So I decided to just get rid of the all the bags, and instead did a tuple of tuples. This worked! But the problem here is that a tuple has to fit into memory, where a bag can spill to disk if it is very large. Currently this won't be an issue since we have to fit the entire row into memory via thrift anyway, but in the future when we have large row support the contract may have to change again. Maybe this is the best option though since right now it Just Works and nothing else is viable. While going through all this I noticed two other problems we currently have. We need to put the indexed columns at the beginning so they can match the schema (since any amount of non-indexed columns may fall between them in sorting order) and the default validator is always being used due to a bad lookup. These are trivial to solve, however. v3 fixes these issues and takes the 'tuple of tuples' approach, for better or worse. > Cassandra inferred schema and actual data don't match > ----------------------------------------------------- > > Key: CASSANDRA-3371 > URL: https://issues.apache.org/jira/browse/CASSANDRA-3371 > Project: Cassandra > Issue Type: Bug > Components: Hadoop > Affects Versions: 0.8.7 > Reporter: Pete Warden > Assignee: Brandon Williams > Attachments: 3371-v2.txt, 3371-v3.txt, pig.diff > > > It's looking like there may be a mismatch between the schema that's being > reported by the latest CassandraStorage.java, and the data that's actually > returned. Here's an example: > rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage(); > DESCRIBE rows; > rows: {key: chararray,columns: {(name: chararray,value: > bytearray,photo_owner: chararray,value_photo_owner: bytearray,pid: > chararray,value_pid: bytearray,matched_string: > chararray,value_matched_string: bytearray,src_big: chararray,value_src_big: > bytearray,time: chararray,value_time: bytearray,vote_type: > chararray,value_vote_type: bytearray,voter: chararray,value_voter: > bytearray)}} > DUMP rows; > (691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu > Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)}) > getSchema() is reporting the columns as an inner bag of tuples, each of which > contains 16 values. In fact, getNext() seems to return an inner bag > containing 7 tuples, each of which contains two values. > It appears that things got out of sync with this change: > http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083 > See more discussion at: > http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html > Here's a patch I ended up creating for my own use, which gives the results I > need (though it doesn't handle super-columns): > DESCRIBE rows; > rows: {cassandra_key: chararray,photo_owner: bytearray,pid: > bytearray,place_matched_string: bytearray,src_big: bytearray,time: > bytearray,vote_type: bytearray,voter: bytearray} > Index: > contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java > =================================================================== > --- > contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java > (revision 1185044) > +++ > contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java > (working copy) > @@ -26,7 +26,7 @@ > import org.apache.cassandra.db.marshal.IntegerType; > import org.apache.cassandra.db.marshal.TypeParser; > import org.apache.cassandra.thrift.*; > -import org.apache.cassandra.utils.Hex; > +import org.apache.cassandra.utils.FBUtilities; > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > > @@ -122,15 +122,15 @@ > assert key != null && cf != null; > > // and wrap it in a tuple > - Tuple tuple = TupleFactory.getInstance().newTuple(2); > + Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1); > ArrayList<Tuple> columns = new ArrayList<Tuple>(); > - tuple.set(0, new DataByteArray(key.array(), > key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); > + int tupleIndex = 0; > + tuple.set(tupleIndex++, new DataByteArray(key.array(), > key.position()+key.arrayOffset(), key.limit()+key.arrayOffset())); > for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet()) > { > - columns.add(columnToTuple(entry.getKey(), entry.getValue(), > cfDef)); > + tuple.set(tupleIndex++, columnToTuple(entry.getKey(), > entry.getValue(), cfDef)); > } > > - tuple.set(1, new DefaultDataBag(columns)); > return tuple; > } > catch (InterruptedException e) > @@ -139,30 +139,22 @@ > } > } > > - private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) > throws IOException > + private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) > throws IOException > { > - Tuple pair = TupleFactory.getInstance().newTuple(2); > List<AbstractType> marshallers = getDefaultMarshallers(cfDef); > Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); > > - setTupleValue(pair, 0, marshallers.get(0).compose(name)); > if (col instanceof Column) > { > // standard > if (validators.get(name) == null) > - setTupleValue(pair, 1, > marshallers.get(1).compose(col.value())); > + return marshallers.get(1).compose(col.value()); > else > - setTupleValue(pair, 1, > validators.get(name).compose(col.value())); > - return pair; > + return validators.get(name).compose(col.value()); > } > > - // super > - ArrayList<Tuple> subcols = new ArrayList<Tuple>(); > - for (IColumn subcol : col.getSubColumns()) > - subcols.add(columnToTuple(subcol.name(), subcol, cfDef)); > - > - pair.set(1, new DefaultDataBag(subcols)); > - return pair; > + // super not currently handled > + return null; > } > > private void setTupleValue(Tuple pair, int position, Object value) > throws ExecException > @@ -312,62 +304,32 @@ > // top-level schema, no type > ResourceSchema schema = new ResourceSchema(); > > + ResourceFieldSchema[] tupleFields = new > ResourceFieldSchema[cfDef.column_metadata.size()+1]; > + int tupleIndex = 0; > + > // add key > ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema(); > - keyFieldSchema.setName("key"); > + keyFieldSchema.setName("cassandra_key"); > keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type > + tupleFields[tupleIndex++] = keyFieldSchema; > > - // will become the bag of tuples > - ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema(); > - bagFieldSchema.setName("columns"); > - bagFieldSchema.setType(DataType.BAG); > - ResourceSchema bagSchema = new ResourceSchema(); > - > - > List<AbstractType> marshallers = getDefaultMarshallers(cfDef); > Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); > - List<ResourceFieldSchema> tupleFields = new > ArrayList<ResourceFieldSchema>(); > > - // default comparator/validator > - ResourceSchema innerTupleSchema = new ResourceSchema(); > - ResourceFieldSchema tupleField = new ResourceFieldSchema(); > - tupleField.setType(DataType.TUPLE); > - tupleField.setSchema(innerTupleSchema); > - > - ResourceFieldSchema colSchema = new ResourceFieldSchema(); > - colSchema.setName("name"); > - colSchema.setType(getPigType(marshallers.get(0))); > - tupleFields.add(colSchema); > - > - ResourceFieldSchema valSchema = new ResourceFieldSchema(); > - AbstractType validator = marshallers.get(1); > - valSchema.setName("value"); > - valSchema.setType(getPigType(validator)); > - tupleFields.add(valSchema); > - > // defined validators/indexes > for (ColumnDef cdef : cfDef.column_metadata) > { > - colSchema = new ResourceFieldSchema(); > - colSchema.setName(new String(cdef.getName())); > - colSchema.setType(getPigType(marshallers.get(0))); > - tupleFields.add(colSchema); > - > - valSchema = new ResourceFieldSchema(); > - validator = validators.get(cdef.getName()); > + ResourceFieldSchema valSchema = new ResourceFieldSchema(); > + AbstractType validator = validators.get(cdef.getName()); > if (validator == null) > validator = marshallers.get(1); > - valSchema.setName("value"); > + valSchema.setName(new String(cdef.getName())); > valSchema.setType(getPigType(validator)); > - tupleFields.add(valSchema); > + tupleFields[tupleIndex++] = valSchema; > } > - innerTupleSchema.setFields(tupleFields.toArray(new > ResourceFieldSchema[tupleFields.size()])); > > - // a bag can contain only one tuple, but that tuple can contain > anything > - bagSchema.setFields(new ResourceFieldSchema[] { tupleField }); > - bagFieldSchema.setSchema(bagSchema); > // top level schema contains everything > - schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, > bagFieldSchema }); > + schema.setFields(tupleFields); > return schema; > } > > @@ -601,7 +563,7 @@ > TSerializer serializer = new TSerializer(new > TBinaryProtocol.Factory()); > try > { > - return Hex.bytesToHex(serializer.serialize(cfDef)); > + return FBUtilities.bytesToHex(serializer.serialize(cfDef)); > } > catch (TException e) > { > @@ -616,7 +578,7 @@ > CfDef cfDef = new CfDef(); > try > { > - deserializer.deserialize(cfDef, Hex.hexToBytes(st)); > + deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st)); > } > catch (TException e) > { -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira