[ 
https://issues.apache.org/jira/browse/CASSANDRA-3371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13161252#comment-13161252
 ] 

Ian F commented on CASSANDRA-3371:
----------------------------------

I've been following this thread and am actively trying to use CassandraStorage. 
Before I found this thread, I tried a bunch of stuff on my own with varying 
degrees of success. My most pressing issue right now is regarding rows with 
different columns. I do not believe the Pig schema that gets generated supports 
that. Please advise. Thanks!
                
> Cassandra inferred schema and actual data don't match
> -----------------------------------------------------
>
>                 Key: CASSANDRA-3371
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-3371
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Hadoop
>    Affects Versions: 0.8.7
>            Reporter: Pete Warden
>            Assignee: Brandon Williams
>         Attachments: 3371-v2.txt, 3371-v3.txt, pig.diff
>
>
> It's looking like there may be a mismatch between the schema that's being 
> reported by the latest CassandraStorage.java, and the data that's actually 
> returned. Here's an example:
> rows = LOAD 'cassandra://Frap/PhotoVotes' USING CassandraStorage();
> DESCRIBE rows;
> rows: {key: chararray,columns: {(name: chararray,value: 
> bytearray,photo_owner: chararray,value_photo_owner: bytearray,pid: 
> chararray,value_pid: bytearray,matched_string: 
> chararray,value_matched_string: bytearray,src_big: chararray,value_src_big: 
> bytearray,time: chararray,value_time: bytearray,vote_type: 
> chararray,value_vote_type: bytearray,voter: chararray,value_voter: 
> bytearray)}}
> DUMP rows;
> (691831038_1317937188.48955,{(photo_owner,1596090180),(pid,6855155124568798560),(matched_string,),(src_big,),(time,Thu
>  Oct 06 14:39:48 -0700 2011),(vote_type,album_dislike),(voter,691831038)})
> getSchema() is reporting the columns as an inner bag of tuples, each of which 
> contains 16 values. In fact, getNext() seems to return an inner bag 
> containing 7 tuples, each of which contains two values. 
> It appears that things got out of sync with this change:
> http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?r1=1177083&r2=1177082&pathrev=1177083
> See more discussion at:
> http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/pig-cassandra-problem-quot-Incompatible-field-schema-quot-error-tc6882703.html
> Here's a patch I ended up creating for my own use, which gives the results I 
> need (though it doesn't handle super-columns):
> DESCRIBE rows;
> rows: {cassandra_key: chararray,photo_owner: bytearray,pid: 
> bytearray,place_matched_string: bytearray,src_big: bytearray,time: 
> bytearray,vote_type: bytearray,voter: bytearray}
> Index: 
> contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
> ===================================================================
> --- 
> contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java    
>     (revision 1185044)
> +++ 
> contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java    
>     (working copy)
> @@ -26,7 +26,7 @@
>  import org.apache.cassandra.db.marshal.IntegerType;
>  import org.apache.cassandra.db.marshal.TypeParser;
>  import org.apache.cassandra.thrift.*;
> -import org.apache.cassandra.utils.Hex;
> +import org.apache.cassandra.utils.FBUtilities;
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
>  
> @@ -122,15 +122,15 @@
>              assert key != null && cf != null;
>              
>              // and wrap it in a tuple
> -             Tuple tuple = TupleFactory.getInstance().newTuple(2);
> +             Tuple tuple = TupleFactory.getInstance().newTuple(cf.size()+1);
>              ArrayList<Tuple> columns = new ArrayList<Tuple>();
> -            tuple.set(0, new DataByteArray(key.array(), 
> key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
> +            int tupleIndex = 0;
> +            tuple.set(tupleIndex++, new DataByteArray(key.array(), 
> key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));            
>              for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
>              {
> -                columns.add(columnToTuple(entry.getKey(), entry.getValue(), 
> cfDef));
> +                tuple.set(tupleIndex++, columnToTuple(entry.getKey(), 
> entry.getValue(), cfDef));
>              }
>  
> -            tuple.set(1, new DefaultDataBag(columns));
>              return tuple;
>          }
>          catch (InterruptedException e)
> @@ -139,30 +139,22 @@
>          }
>      }
>  
> -    private Tuple columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) 
> throws IOException
> +    private Object columnToTuple(ByteBuffer name, IColumn col, CfDef cfDef) 
> throws IOException
>      {
> -        Tuple pair = TupleFactory.getInstance().newTuple(2);
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
>  
> -        setTupleValue(pair, 0, marshallers.get(0).compose(name));
>          if (col instanceof Column)
>          {
>              // standard
>              if (validators.get(name) == null)
> -                setTupleValue(pair, 1, 
> marshallers.get(1).compose(col.value()));
> +                return marshallers.get(1).compose(col.value());
>              else
> -                setTupleValue(pair, 1, 
> validators.get(name).compose(col.value()));
> -            return pair;
> +                return validators.get(name).compose(col.value());
>          }
>  
> -        // super
> -        ArrayList<Tuple> subcols = new ArrayList<Tuple>();
> -        for (IColumn subcol : col.getSubColumns())
> -            subcols.add(columnToTuple(subcol.name(), subcol, cfDef));
> -        
> -        pair.set(1, new DefaultDataBag(subcols));
> -        return pair;
> +        // super not currently handled
> +        return null;
>      }
>  
>      private void setTupleValue(Tuple pair, int position, Object value) 
> throws ExecException
> @@ -312,62 +304,32 @@
>          // top-level schema, no type
>          ResourceSchema schema = new ResourceSchema();
>  
> +        ResourceFieldSchema[] tupleFields = new 
> ResourceFieldSchema[cfDef.column_metadata.size()+1];
> +        int tupleIndex = 0;
> +        
>          // add key
>          ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
> -        keyFieldSchema.setName("key");
> +        keyFieldSchema.setName("cassandra_key");
>          keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
> +        tupleFields[tupleIndex++] = keyFieldSchema;
>  
> -        // will become the bag of tuples
> -        ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
> -        bagFieldSchema.setName("columns");
> -        bagFieldSchema.setType(DataType.BAG);
> -        ResourceSchema bagSchema = new ResourceSchema();
> -
> -
>          List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
>          Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
> -        List<ResourceFieldSchema> tupleFields = new 
> ArrayList<ResourceFieldSchema>();
>  
> -        // default comparator/validator
> -        ResourceSchema innerTupleSchema = new ResourceSchema();
> -        ResourceFieldSchema tupleField = new ResourceFieldSchema();
> -        tupleField.setType(DataType.TUPLE);
> -        tupleField.setSchema(innerTupleSchema);
> -
> -        ResourceFieldSchema colSchema = new ResourceFieldSchema();
> -        colSchema.setName("name");
> -        colSchema.setType(getPigType(marshallers.get(0)));
> -        tupleFields.add(colSchema);
> -
> -        ResourceFieldSchema valSchema = new ResourceFieldSchema();
> -        AbstractType validator = marshallers.get(1);
> -        valSchema.setName("value");
> -        valSchema.setType(getPigType(validator));
> -        tupleFields.add(valSchema);
> -
>          // defined validators/indexes
>          for (ColumnDef cdef : cfDef.column_metadata)
>          {
> -            colSchema = new ResourceFieldSchema();
> -            colSchema.setName(new String(cdef.getName()));
> -            colSchema.setType(getPigType(marshallers.get(0)));
> -            tupleFields.add(colSchema);
> -
> -            valSchema = new ResourceFieldSchema();
> -            validator = validators.get(cdef.getName());
> +            ResourceFieldSchema valSchema = new ResourceFieldSchema();
> +            AbstractType validator = validators.get(cdef.getName());
>              if (validator == null)
>                  validator = marshallers.get(1);
> -            valSchema.setName("value");
> +            valSchema.setName(new String(cdef.getName()));
>              valSchema.setType(getPigType(validator));
> -            tupleFields.add(valSchema);
> +            tupleFields[tupleIndex++] = valSchema;
>          }
> -        innerTupleSchema.setFields(tupleFields.toArray(new 
> ResourceFieldSchema[tupleFields.size()]));
>  
> -        // a bag can contain only one tuple, but that tuple can contain 
> anything
> -        bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
> -        bagFieldSchema.setSchema(bagSchema);
>          // top level schema contains everything
> -        schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, 
> bagFieldSchema });
> +        schema.setFields(tupleFields);
>          return schema;
>      }
>  
> @@ -601,7 +563,7 @@
>          TSerializer serializer = new TSerializer(new 
> TBinaryProtocol.Factory());
>          try
>          {
> -            return Hex.bytesToHex(serializer.serialize(cfDef));
> +            return FBUtilities.bytesToHex(serializer.serialize(cfDef));
>          }
>          catch (TException e)
>          {
> @@ -616,7 +578,7 @@
>          CfDef cfDef = new CfDef();
>          try
>          {
> -            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
> +            deserializer.deserialize(cfDef, FBUtilities.hexToBytes(st));
>          }
>          catch (TException e)
>          {

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to