[ 
https://issues.apache.org/jira/browse/CASSANDRA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon Williams updated CASSANDRA-3371:
----------------------------------------

    Attachment: 3371-v3.txt

I had what I thought would be a good idea to accomodate this: I'd alias the 
tuples themselves after the index names, allowing you to do something like this:

{noformat}
album_votes = FILTER all_votes BY (columns.vote_type.value EQ 'album_like') OR 
(columns.vote_type.value EQ 'album_dislike');
{noformat}

It's not that easy, however.  When you dereference a bag, it automatically 
dereferences the tuple inside it (programmatically, a bag can only have one 
tuple, but that tuple can contain anything, and thus the automatic deref to 
mask this from the user.)  Unfortunately, it appears to also deref any other 
tuples inside as well, so you end up in a situation where 'columns.$0' returns 
the column name, and weird aliasing issues like 'columns.<first index name>' 
also returns the column name and the second index returns the value, with the 
rest being null.

To get around this, I thought I'd nest each tuple inside another bag.  This of 
course results in a crazy looking schema:
{noformat}
votes: {key: bytearray,columns: {(matched_string: {(name: chararray,value: 
chararray)},photo_owner: {(name: chararray,value: chararray)},pid: {(name: 
chararray,value: chararray)},src_big: {(name: chararray,value: 
chararray)},time: {(name: chararray,value: chararray)},vote_type: {(name: 
chararray,value: chararray)},voter: {(name: chararray,value: chararray)})}}
{noformat}
but if it still derefs as elegantly, it won't be too bad.  The problem here is 
that it still can't deref correctly, you end up with nonsensical parsing errors 
such as:
{noformat}
ERROR 1200: Pig script failed to parse: 
<file photo.pig, line 5, column 8> pig script failed to validate: 
org.apache.pig.impl.logicalLayer.FrontendException: ERROR 1128: Cannot find 
field name in matched_string:bag{:tuple(name:chararray,value:chararray)
{noformat}
even though the 'name' alias is clearly there and 'matched_string' was already 
being deref'd.

So I decided to just get rid of the all the bags, and instead did a tuple of 
tuples.  This worked!  But the problem here is that a tuple has to fit into 
memory, where a bag can spill to disk if it is very large.  Currently this 
won't be an issue since we have to fit the entire row into memory via thrift 
anyway, but in the future when we have large row support the contract may have 
to change again.  Maybe this is the best option though since right now it Just 
Works and nothing else is viable.

While going through all this I noticed two other problems we currently have.  
We need to put the indexed columns at the beginning so they can match the 
schema (since any amount of non-indexed columns may fall between them in 
sorting order) and the default validator is always being used due to a bad 
lookup.  These are trivial to solve, however.  v3 fixes these issues and takes 
the 'tuple of tuples' approach, for better or worse.
                
> Cassandra inferred schema and actual data don't match
> -----------------------------------------------------
>
>                 Key: CASSANDRA-3371
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-3371
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Hadoop
>    Affects Versions: 0.8.7
>            Reporter: Pete Warden
>            Assignee: Brandon Williams
>         Attachments: 3371-v2.txt, 3371-v3.txt, pig.diff
>
>
> It's looking like there may be a mismatch between the schema that's being 
> reported by the latest CassandraStorage.java, and the data that's actually 
> returned. Here's an example:
> rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
> DESCRIBE rows;
> rows: {key: chararray,columns: {(name: chararray,value: 
> bytearray,photo_owner: chararray,value_photo_owner: bytearray,pid: 
> chararray,value_pid: bytearray,matched_string: 
> chararray,value_matched_string: bytearray,src_big: chararray,value_src_big: 
> bytearray,time: chararray,value_time: bytearray,vote_type: 
> chararray,value_vote_type: bytearray,voter: chararray,value_voter: 
> bytearray)}}
> DUMP rows;
> (691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu
>  Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)})
> getSchema() is reporting the columns as an inner bag of tuples, each of which 
> contains 16 values. In fact, getNext() seems to return an inner bag 
> containing 7 tuples, each of which contains two values. 
> It appears that things got out of sync with this change:
> http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083
> See more discussion at:
> http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html
> Here's a patch I ended up creating for my own use, which gives the results I 
> need (though it doesn't handle super-columns):
> DESCRIBE rows;
> rows: {cassandra_key: chararray,photo_owner: bytearray,pid: 
> bytearray,place_matched_string: bytearray,src_big: bytearray,time: 
> bytearray,vote_type: bytearray,voter: bytearray}
> Index: 
> contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
> ===================================================================
> --- 
> contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java    
>     (revision 1185044)
> +++ 
> contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java    
>     (working copy)
> @@ -26,7 +26,7 @@
>  import org.apache.cassandra.db.marshal.IntegerType;
>  import org.apache.cassandra.db.marshal.TypeParser;
>  import org.apache.cassandra.thrift.*;
> -import org.apache.cassandra.utils.Hex;
> +import org.apache.cassandra.utils.FBUtilities;
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
>  
> @@ -122,15 +122,15 @@
>              assert key != null && cf != null;
>              
>              // and wrap it in a tuple
> -             Tuple tuple = TupleFactory.getInstance().newTuple(2);
> +             Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1);
>              ArrayList<Tuple> columns = new ArrayList<Tuple>();
> -            tuple.set(0, new DataByteArray(key.array(), 
> key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
> +            int tupleIndex = 0;
> +            tuple.set(tupleIndex++, new DataByteArray(key.array(), 
> key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));            
>              for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
>              {
> -                columns.add(columnToTuple(entry.getKey(), entry.getValue(), 
> cfDef));
> +                tuple.set(tupleIndex++, columnToTuple(entry.getKey(), 
> entry.getValue(), cfDef));
>              }
>  
> -            tuple.set(1, new DefaultDataBag(columns));
>              return tuple;
>          }
>          catch (InterruptedException e)
> @@ -139,30 +139,22 @@
>          }
>      }
>  
> -    private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) 
> throws IOException
> +    private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) 
> throws IOException
>      {
> -        Tuple pair = TupleFactory.getInstance().newTuple(2);
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
>  
> -        setTupleValue(pair, 0, marshallers.get(0).compose(name));
>          if (col instanceof Column)
>          {
>              // standard
>              if (validators.get(name) == null)
> -                setTupleValue(pair, 1, 
> marshallers.get(1).compose(col.value()));
> +                return marshallers.get(1).compose(col.value());
>              else
> -                setTupleValue(pair, 1, 
> validators.get(name).compose(col.value()));
> -            return pair;
> +                return validators.get(name).compose(col.value());
>          }
>  
> -        // super
> -        ArrayList<Tuple> subcols = new ArrayList<Tuple>();
> -        for (IColumn subcol : col.getSubColumns())
> -            subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
> -        
> -        pair.set(1, new DefaultDataBag(subcols));
> -        return pair;
> +        // super not currently handled
> +        return null;
>      }
>  
>      private void setTupleValue(Tuple pair, int position, Object value) 
> throws ExecException
> @@ -312,62 +304,32 @@
>          // top-level schema, no type
>          ResourceSchema schema = new ResourceSchema();
>  
> +        ResourceFieldSchema[] tupleFields = new 
> ResourceFieldSchema[cfDef.column_metadata.size()+1];
> +        int tupleIndex = 0;
> +        
>          // add key
>          ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
> -        keyFieldSchema.setName("key");
> +        keyFieldSchema.setName("cassandra_key");
>          keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
> +        tupleFields[tupleIndex++] = keyFieldSchema;
>  
> -        // will become the bag of tuples
> -        ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
> -        bagFieldSchema.setName("columns");
> -        bagFieldSchema.setType(DataType.BAG);
> -        ResourceSchema bagSchema = new ResourceSchema();
> -
> -
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
> -        List<ResourceFieldSchema> tupleFields = new 
> ArrayList<ResourceFieldSchema>();
>  
> -        // default comparator/validator
> -        ResourceSchema innerTupleSchema = new ResourceSchema();
> -        ResourceFieldSchema tupleField = new ResourceFieldSchema();
> -        tupleField.setType(DataType.TUPLE);
> -        tupleField.setSchema(innerTupleSchema);
> -
> -        ResourceFieldSchema colSchema = new ResourceFieldSchema();
> -        colSchema.setName("name");
> -        colSchema.setType(getPigType(marshallers.get(0)));
> -        tupleFields.add(colSchema);
> -
> -        ResourceFieldSchema valSchema = new ResourceFieldSchema();
> -        AbstractType validator = marshallers.get(1);
> -        valSchema.setName("value");
> -        valSchema.setType(getPigType(validator));
> -        tupleFields.add(valSchema);
> -
>          // defined validators/indexes
>          for (ColumnDef cdef : cfDef.column_metadata)
>          {
> -            colSchema = new ResourceFieldSchema();
> -            colSchema.setName(new String(cdef.getName()));
> -            colSchema.setType(getPigType(marshallers.get(0)));
> -            tupleFields.add(colSchema);
> -
> -            valSchema = new ResourceFieldSchema();
> -            validator = validators.get(cdef.getName());
> +            ResourceFieldSchema valSchema = new ResourceFieldSchema();
> +            AbstractType validator = validators.get(cdef.getName());
>              if (validator == null)
>                  validator = marshallers.get(1);
> -            valSchema.setName("value");
> +            valSchema.setName(new String(cdef.getName()));
>              valSchema.setType(getPigType(validator));
> -            tupleFields.add(valSchema);
> +            tupleFields[tupleIndex++] = valSchema;
>          }
> -        innerTupleSchema.setFields(tupleFields.toArray(new 
> ResourceFieldSchema[tupleFields.size()]));
>  
> -        // a bag can contain only one tuple, but that tuple can contain 
> anything
> -        bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
> -        bagFieldSchema.setSchema(bagSchema);
>          // top level schema contains everything
> -        schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, 
> bagFieldSchema });
> +        schema.setFields(tupleFields);
>          return schema;
>      }
>  
> @@ -601,7 +563,7 @@
>          TSerializer serializer = new TSerializer(new 
> TBinaryProtocol.Factory());
>          try
>          {
> -            return Hex.bytesToHex(serializer.serialize(cfDef));
> +            return FBUtilities.bytesToHex(serializer.serialize(cfDef));
>          }
>          catch (TException e)
>          {
> @@ -616,7 +578,7 @@
>          CfDef cfDef = new CfDef();
>          try
>          {
> -            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
> +            deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st));
>          }
>          catch (TException e)
>          {

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to