[ 
https://issues.apache.org/jira/browse/CASSANDRA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13150651#comment-13150651
 ] 

Jeremy Hanna commented on CASSANDRA-3371:
-----------------------------------------

I think with the pygmalion style of specification, it might require a bit more 
on the load, but less on the filter.  The columns var wouldn't be needed on the 
filter, for example.  It would just look like the pygmalion examples except you 
wouldn't need FromCassandraBag, you would specify that in "USING 
CassandraStorage('vote_type')"

So I think it would just be:
all_votes = LOAD 'cassandra://Frap/PhotoVotes' USING 
CassandraStorage('vote_type'); -- you can add any additional columns
album_votes = FILTER all_votes BY (vote_type EQ 'album_like') OR (vote_type EQ 
'album_dislike');

You can add other columns as you need them in the CassandraStorage section.
                
> Cassandra inferred schema and actual data don't match
> -----------------------------------------------------
>
>                 Key: CASSANDRA-3371
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-3371
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Hadoop
>    Affects Versions: 0.8.7
>            Reporter: Pete Warden
>            Assignee: Brandon Williams
>         Attachments: 3371-v2.txt, 3371-v3.txt, pig.diff
>
>
> It's looking like there may be a mismatch between the schema that's being 
> reported by the latest CassandraStorage.java, and the data that's actually 
> returned. Here's an example:
> rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
> DESCRIBE rows;
> rows: {key: chararray,columns: {(name: chararray,value: 
> bytearray,photo_owner: chararray,value_photo_owner: bytearray,pid: 
> chararray,value_pid: bytearray,matched_string: 
> chararray,value_matched_string: bytearray,src_big: chararray,value_src_big: 
> bytearray,time: chararray,value_time: bytearray,vote_type: 
> chararray,value_vote_type: bytearray,voter: chararray,value_voter: 
> bytearray)}}
> DUMP rows;
> (691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu
>  Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)})
> getSchema() is reporting the columns as an inner bag of tuples, each of which 
> contains 16 values. In fact, getNext() seems to return an inner bag 
> containing 7 tuples, each of which contains two values. 
> It appears that things got out of sync with this change:
> http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083
> See more discussion at:
> http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html
> Here's a patch I ended up creating for my own use, which gives the results I 
> need (though it doesn't handle super-columns):
> DESCRIBE rows;
> rows: {cassandra_key: chararray,photo_owner: bytearray,pid: 
> bytearray,place_matched_string: bytearray,src_big: bytearray,time: 
> bytearray,vote_type: bytearray,voter: bytearray}
> Index: 
> contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
> ===================================================================
> --- 
> contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java    
>     (revision 1185044)
> +++ 
> contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java    
>     (working copy)
> @@ -26,7 +26,7 @@
>  import org.apache.cassandra.db.marshal.IntegerType;
>  import org.apache.cassandra.db.marshal.TypeParser;
>  import org.apache.cassandra.thrift.*;
> -import org.apache.cassandra.utils.Hex;
> +import org.apache.cassandra.utils.FBUtilities;
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
>  
> @@ -122,15 +122,15 @@
>              assert key != null && cf != null;
>              
>              // and wrap it in a tuple
> -             Tuple tuple = TupleFactory.getInstance().newTuple(2);
> +             Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1);
>              ArrayList<Tuple> columns = new ArrayList<Tuple>();
> -            tuple.set(0, new DataByteArray(key.array(), 
> key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
> +            int tupleIndex = 0;
> +            tuple.set(tupleIndex++, new DataByteArray(key.array(), 
> key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));            
>              for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
>              {
> -                columns.add(columnToTuple(entry.getKey(), entry.getValue(), 
> cfDef));
> +                tuple.set(tupleIndex++, columnToTuple(entry.getKey(), 
> entry.getValue(), cfDef));
>              }
>  
> -            tuple.set(1, new DefaultDataBag(columns));
>              return tuple;
>          }
>          catch (InterruptedException e)
> @@ -139,30 +139,22 @@
>          }
>      }
>  
> -    private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) 
> throws IOException
> +    private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) 
> throws IOException
>      {
> -        Tuple pair = TupleFactory.getInstance().newTuple(2);
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
>  
> -        setTupleValue(pair, 0, marshallers.get(0).compose(name));
>          if (col instanceof Column)
>          {
>              // standard
>              if (validators.get(name) == null)
> -                setTupleValue(pair, 1, 
> marshallers.get(1).compose(col.value()));
> +                return marshallers.get(1).compose(col.value());
>              else
> -                setTupleValue(pair, 1, 
> validators.get(name).compose(col.value()));
> -            return pair;
> +                return validators.get(name).compose(col.value());
>          }
>  
> -        // super
> -        ArrayList<Tuple> subcols = new ArrayList<Tuple>();
> -        for (IColumn subcol : col.getSubColumns())
> -            subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
> -        
> -        pair.set(1, new DefaultDataBag(subcols));
> -        return pair;
> +        // super not currently handled
> +        return null;
>      }
>  
>      private void setTupleValue(Tuple pair, int position, Object value) 
> throws ExecException
> @@ -312,62 +304,32 @@
>          // top-level schema, no type
>          ResourceSchema schema = new ResourceSchema();
>  
> +        ResourceFieldSchema[] tupleFields = new 
> ResourceFieldSchema[cfDef.column_metadata.size()+1];
> +        int tupleIndex = 0;
> +        
>          // add key
>          ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
> -        keyFieldSchema.setName("key");
> +        keyFieldSchema.setName("cassandra_key");
>          keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
> +        tupleFields[tupleIndex++] = keyFieldSchema;
>  
> -        // will become the bag of tuples
> -        ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
> -        bagFieldSchema.setName("columns");
> -        bagFieldSchema.setType(DataType.BAG);
> -        ResourceSchema bagSchema = new ResourceSchema();
> -
> -
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
> -        List<ResourceFieldSchema> tupleFields = new 
> ArrayList<ResourceFieldSchema>();
>  
> -        // default comparator/validator
> -        ResourceSchema innerTupleSchema = new ResourceSchema();
> -        ResourceFieldSchema tupleField = new ResourceFieldSchema();
> -        tupleField.setType(DataType.TUPLE);
> -        tupleField.setSchema(innerTupleSchema);
> -
> -        ResourceFieldSchema colSchema = new ResourceFieldSchema();
> -        colSchema.setName("name");
> -        colSchema.setType(getPigType(marshallers.get(0)));
> -        tupleFields.add(colSchema);
> -
> -        ResourceFieldSchema valSchema = new ResourceFieldSchema();
> -        AbstractType validator = marshallers.get(1);
> -        valSchema.setName("value");
> -        valSchema.setType(getPigType(validator));
> -        tupleFields.add(valSchema);
> -
>          // defined validators/indexes
>          for (ColumnDef cdef : cfDef.column_metadata)
>          {
> -            colSchema = new ResourceFieldSchema();
> -            colSchema.setName(new String(cdef.getName()));
> -            colSchema.setType(getPigType(marshallers.get(0)));
> -            tupleFields.add(colSchema);
> -
> -            valSchema = new ResourceFieldSchema();
> -            validator = validators.get(cdef.getName());
> +            ResourceFieldSchema valSchema = new ResourceFieldSchema();
> +            AbstractType validator = validators.get(cdef.getName());
>              if (validator == null)
>                  validator = marshallers.get(1);
> -            valSchema.setName("value");
> +            valSchema.setName(new String(cdef.getName()));
>              valSchema.setType(getPigType(validator));
> -            tupleFields.add(valSchema);
> +            tupleFields[tupleIndex++] = valSchema;
>          }
> -        innerTupleSchema.setFields(tupleFields.toArray(new 
> ResourceFieldSchema[tupleFields.size()]));
>  
> -        // a bag can contain only one tuple, but that tuple can contain 
> anything
> -        bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
> -        bagFieldSchema.setSchema(bagSchema);
>          // top level schema contains everything
> -        schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, 
> bagFieldSchema });
> +        schema.setFields(tupleFields);
>          return schema;
>      }
>  
> @@ -601,7 +563,7 @@
>          TSerializer serializer = new TSerializer(new 
> TBinaryProtocol.Factory());
>          try
>          {
> -            return Hex.bytesToHex(serializer.serialize(cfDef));
> +            return FBUtilities.bytesToHex(serializer.serialize(cfDef));
>          }
>          catch (TException e)
>          {
> @@ -616,7 +578,7 @@
>          CfDef cfDef = new CfDef();
>          try
>          {
> -            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
> +            deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st));
>          }
>          catch (TException e)
>          {

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to