Repository: cassandra
Updated Branches:
  refs/heads/trunk d62cd1bc9 -> 446e25378


http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java 
b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
index 60d04d3..93ac5be 100644
--- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
+++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
@@ -22,24 +22,21 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
-import org.apache.cassandra.thrift.ColumnPath;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.NotFoundException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.thrift.TException;
 
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ThriftColumnFamilyTest extends PigTestBase
-{    
+{
     private static String[] statements = {
             "DROP KEYSPACE IF EXISTS thrift_ks",
             "CREATE KEYSPACE thrift_ks WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': 1};",
@@ -171,7 +168,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
     };
 
     @BeforeClass
-    public static void setup() throws IOException, ConfigurationException, 
TException
+    public static void setup() throws IOException, ConfigurationException
     {
         startCassandra();
         executeCQLStatements(statements);
@@ -300,11 +297,11 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageSchema() throws IOException
+    public void testCqlNativeStorageSchema() throws IOException
     {
         //results: 
(qux,(atomic_weight,0.660161815846869),(created,1335890877),(name,User 
Qux),(percent,64.7),
         //(rating,2),(score,12000),(vote_type,dislike))
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + 
defaultParameters + "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + 
defaultParameters + "' USING CqlNativeStorage();");
 
         //schema: {key: chararray,atomic_weight: (name: chararray,value: 
double),created: (name: chararray,value: long),
         //name: (name: chararray,value: chararray),percent: (name: 
chararray,value: float),
@@ -339,12 +336,13 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageFullCopy() throws IOException, TException
+    public void testCqlNativeStorageFullCopy() throws IOException
     {
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + 
defaultParameters + "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+        pig.registerQuery("records = FOREACH rows GENERATE 
TOTUPLE(TOTUPLE('key', key)),TOTUPLE(atomic_weight, created, name, percent, 
rating, score, vote_type);");
         //full copy
-        pig.registerQuery("STORE rows INTO 
'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING 
CassandraStorage();");
+        pig.registerQuery("STORE records INTO 
'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + 
"&output_query=UPDATE+thrift_ks.copy_of_some_app+set+atomic_weight+%3D+%3F,+created+%3D+%3F,+name+%3D+%3F,+percent+%3D+%3F,+rating+%3D+%3F,+score+%3D+%3F,+vote_type+%3D+%3F'
 USING CqlNativeStorage();");
         pig.executeBatch();
         Assert.assertEquals("User Qux", getColumnValue("thrift_ks", 
"copy_of_some_app", "name", "qux", "UTF8Type"));
         Assert.assertEquals("dislike", getColumnValue("thrift_ks", 
"copy_of_some_app", "vote_type", "qux", "UTF8Type"));
@@ -352,158 +350,94 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageSingleTupleCopy() throws IOException, 
TException
+    public void testCqlNativeStorageSingleTupleCopy() throws IOException
     {
         executeCQLStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + 
defaultParameters + "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
         //single tuple
-        pig.registerQuery("onecol = FOREACH rows GENERATE key, percent;");
-        pig.registerQuery("STORE onecol INTO 
'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING 
CassandraStorage();");
+        pig.registerQuery("onecol = FOREACH rows GENERATE 
TOTUPLE(TOTUPLE('key', key)), TOTUPLE(percent);");
+        pig.registerQuery("STORE onecol INTO 
'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + 
"&output_query=UPDATE+thrift_ks.copy_of_some_app+set+percent+%3D+%3F' USING 
CqlNativeStorage();");
         pig.executeBatch();
-        String value = null;
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "name", 
"qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        String value = getColumnValue("thrift_ks", "copy_of_some_app", "name", 
"qux", "UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", 
"vote_type", "qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", 
"qux", "UTF8Type");
         if (value != null)
             Assert.fail();
         Assert.assertEquals("64.7", getColumnValue("thrift_ks", 
"copy_of_some_app", "percent", "qux", "FloatType"));
     }
 
     @Test
-    public void testCassandraStorageBagOnlyCopy() throws IOException, 
TException
+    public void testCqlNativeStorageBagOnlyCopy() throws IOException
     {
         executeCQLStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + 
defaultParameters + "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
         //bag only
-        pig.registerQuery("other = FOREACH rows GENERATE key, columns;");
-        pig.registerQuery("STORE other INTO 
'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING 
CassandraStorage();");
+        pig.registerQuery("other = FOREACH rows GENERATE 
TOTUPLE(TOTUPLE('key', key)), TOTUPLE();");
+        pig.registerQuery("STORE other INTO 
'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + "' 
USING CqlNativeStorage();");
         pig.executeBatch();
-        String value = null;
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "name", 
"qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        String value = getColumnValue("thrift_ks", "copy_of_some_app", "name", 
"qux", "UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", 
"vote_type", "qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", 
"qux", "UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "percent", 
"qux", "FloatType");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "percent", 
"qux", "FloatType");
         if (value != null)
             Assert.fail();
     }
 
     @Test
-    public void testCassandraStorageFilter() throws IOException, TException
+    public void testCqlNativeStorageFilter() throws IOException
     {
         executeCQLStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + 
defaultParameters + "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
 
         //filter
-        pig.registerQuery("likes = FILTER rows by vote_type.value eq 'like' 
and rating.value > 5;");
-        pig.registerQuery("STORE likes INTO 
'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING 
CassandraStorage();");
+        pig.registerQuery("likes = FILTER rows by vote_type eq 'like' and 
rating > 5;");
+        pig.registerQuery("records = FOREACH likes GENERATE 
TOTUPLE(TOTUPLE('key', key)),TOTUPLE(atomic_weight, created, name, percent, 
rating, score, vote_type);");
+        pig.registerQuery("STORE records INTO 
'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + 
"&output_query=UPDATE+thrift_ks.copy_of_some_app+set+atomic_weight+%3D+%3F,+created+%3D+%3F,+name+%3D+%3F,+percent+%3D+%3F,+rating+%3D+%3F,+score+%3D+%3F,+vote_type+%3D+%3F'
 USING CqlNativeStorage();");
         pig.executeBatch();
 
         Assert.assertEquals("like", getColumnValue("thrift_ks", 
"copy_of_some_app", "vote_type", "bar", "UTF8Type"));
         Assert.assertEquals("like", getColumnValue("thrift_ks", 
"copy_of_some_app", "vote_type", "foo", "UTF8Type"));
-        String value = null;
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", 
"vote_type", "qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        String value = getColumnValue("thrift_ks", "copy_of_some_app", 
"vote_type", "qux", "UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", 
"vote_type", "baz", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", 
"baz", "UTF8Type");
+
         if (value != null)
             Assert.fail();
 
         executeCQLStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + 
defaultParameters + "' USING CassandraStorage();");
-        pig.registerQuery("dislikes_extras = FILTER rows by vote_type.value eq 
'dislike';");
-        pig.registerQuery("STORE dislikes_extras INTO 
'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING 
CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+        pig.registerQuery("dislikes_extras = FILTER rows by vote_type eq 
'dislike';");
+        pig.registerQuery("dislikes_records = FOREACH dislikes_extras GENERATE 
TOTUPLE(TOTUPLE('key', key)),TOTUPLE(atomic_weight, created, name, percent, 
rating, score, vote_type);");
+        pig.registerQuery("STORE dislikes_records INTO 
'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + 
"&output_query=UPDATE+thrift_ks.copy_of_some_app+set+atomic_weight+%3D+%3F,+created+%3D+%3F,+name+%3D+%3F,+percent+%3D+%3F,+rating+%3D+%3F,+score+%3D+%3F,+vote_type+%3D+%3F'
 USING CqlNativeStorage();");
         pig.executeBatch();
         Assert.assertEquals("dislike", getColumnValue("thrift_ks", 
"copy_of_some_app", "vote_type", "baz", "UTF8Type"));
         Assert.assertEquals("dislike", getColumnValue("thrift_ks", 
"copy_of_some_app", "vote_type", "qux", "UTF8Type"));
-        value = null;
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", 
"vote_type", "bar", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", 
"bar", "UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", 
"vote_type", "foo", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", 
"foo", "UTF8Type");
         if (value != null)
             Assert.fail();
     }
 
     @Test
-    public void testCassandraStorageJoin() throws IOException
+    public void testCqlNativeStorageJoin() throws IOException
     {
         //test key types with a join
-        pig.registerQuery("U8 = load 'cassandra://thrift_ks/u8?" + 
defaultParameters + "' using CassandraStorage();");
-        pig.registerQuery("Bytes = load 'cassandra://thrift_ks/bytes?" + 
defaultParameters + "' using CassandraStorage();");
+        pig.registerQuery("U8 = load 'cql://thrift_ks/u8?" + defaultParameters 
+ nativeParameters + 
"&input_cql=select%20*%20from%20u8%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+        pig.registerQuery("Bytes = load 'cql://thrift_ks/bytes?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20bytes%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
 
         //cast key to chararray
-        pig.registerQuery("b = foreach Bytes generate (chararray)key, 
columns;");
+        pig.registerQuery("b = foreach Bytes generate (chararray)key, column1, 
value;");
 
         //key in Bytes is a bytearray, U8 chararray
         //(foo,{(x,Z)},foo,{(x,Z)})
@@ -512,18 +446,11 @@ public class ThriftColumnFamilyTest extends PigTestBase
         if (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), new DataByteArray("foo".getBytes()));
-            DataBag columns = (DataBag) t.get(1);
-            Iterator<Tuple> iter = columns.iterator();
-            Tuple t1 = iter.next();
-            Assert.assertEquals(t1.get(0), "x");
-            Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
-            String column = (String) t.get(2);
-            Assert.assertEquals(column, "foo");
-            columns = (DataBag) t.get(3);
-            iter = columns.iterator();
-            Tuple t2 = iter.next();
-            Assert.assertEquals(t2.get(0), "x");
-            Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
+            Assert.assertEquals(t.get(1), "x");
+            Assert.assertEquals(t.get(2), new DataByteArray("Z".getBytes()));
+            Assert.assertEquals(t.get(3), "foo");
+            Assert.assertEquals(t.get(4), "x");
+            Assert.assertEquals(t.get(5), new DataByteArray("Z".getBytes()));
         }
         //key should now be cast into a chararray
         //(foo,{(x,Z)},foo,{(x,Z)})
@@ -532,27 +459,22 @@ public class ThriftColumnFamilyTest extends PigTestBase
         if (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), "foo");
-            DataBag columns = (DataBag) t.get(1);
-            Iterator<Tuple> iter = columns.iterator();
-            Tuple t1 = iter.next();
-            Assert.assertEquals(t1.get(0), "x");
-            Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
-            String column = (String) t.get(2);
-            Assert.assertEquals(column, "foo");
-            columns = (DataBag) t.get(3);
-            iter = columns.iterator();
-            Tuple t2 = iter.next();
-            Assert.assertEquals(t2.get(0), "x");
-            Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
+            Assert.assertEquals(t.get(1), "x");
+            Assert.assertEquals(t.get(2), new DataByteArray("Z".getBytes()));
+            Assert.assertEquals(t.get(3), "foo");
+            Assert.assertEquals(t.get(4), "x");
+            Assert.assertEquals(t.get(5), new DataByteArray("Z".getBytes()));
         }
     }
 
     @Test
-    public void testCassandraStorageCounterCF() throws IOException
+    public void testCqlNativeStorageCounterCF() throws IOException
     {
         //Test counter column family support
-        pig.registerQuery("CC = load 'cassandra://thrift_ks/cc?" + 
defaultParameters + "' using CassandraStorage();");
-        pig.registerQuery("total_hits = foreach CC generate key, 
SUM(columns.value);");
+        pig.registerQuery("CC = load 'cql://thrift_ks/cc?" + defaultParameters 
+ nativeParameters + 
"&input_cql=select%20*%20from%20cc%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+        pig.registerQuery("A = foreach CC generate key, name, value;");
+        pig.registerQuery("B = GROUP A BY key;");
+        pig.registerQuery("total_hits = foreach B generate group, 
SUM(A.value);");
         //(chuck,4)
         Tuple t = pig.openIterator("total_hits").next();
         Assert.assertEquals(t.get(0), "chuck");
@@ -560,12 +482,11 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageCompositeColumnCF() throws IOException
+    public void testCqlNativeStorageCompositeColumnCF() throws IOException
     {
         //Test CompositeType
-        pig.registerQuery("compo = load 'cassandra://thrift_ks/compo?" + 
defaultParameters + "' using CassandraStorage();");
-        pig.registerQuery("compo = foreach compo generate key as method, 
flatten(columns);");
-        pig.registerQuery("lee = filter compo by columns::name == 
('bruce','lee');");
+        pig.registerQuery("compo = load 'cql://thrift_ks/compo?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20compo%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+        pig.registerQuery("lee = filter compo by column1 == 'bruce' AND 
column2 == 'lee';");
 
         //(kick,(bruce,lee),oww)
         //(punch,(bruce,lee),ouch)
@@ -574,18 +495,16 @@ public class ThriftColumnFamilyTest extends PigTestBase
         while (it.hasNext()) {
             count ++;
             Tuple t = it.next();
-            Tuple t1 = (Tuple) t.get(1);
-            Assert.assertEquals(t1.get(0), "bruce");
-            Assert.assertEquals(t1.get(1), "lee");
+            Assert.assertEquals(t.get(1), "bruce");
+            Assert.assertEquals(t.get(2), "lee");
             if ("kick".equals(t.get(0)))
-                Assert.assertEquals(t.get(2), "oww");
-            else if ("kick".equals(t.get(0)))
-                Assert.assertEquals(t.get(2), "ouch");
+                Assert.assertEquals(t.get(3), "oww");
+            else
+                Assert.assertEquals(t.get(3), "ouch");
         }
         Assert.assertEquals(count, 2);
-        pig.registerQuery("night = load 'cassandra://thrift_ks/compo_int?" + 
defaultParameters + "' using CassandraStorage();");
-        pig.registerQuery("night = foreach night generate flatten(columns);");
-        pig.registerQuery("night = foreach night generate 
(int)columns::name.$0+(double)columns::name.$1/60 as hour, columns::value as 
noise;");
+        pig.registerQuery("night = load 'cql://thrift_ks/compo_int?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20compo_int%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+        pig.registerQuery("night = foreach night generate 
(int)column1+(double)column2/60 as hour, value as noise;");
 
         //What happens at the darkest hour?
         pig.registerQuery("darkest = filter night by hour > 2 and hour < 5;");
@@ -598,10 +517,10 @@ public class ThriftColumnFamilyTest extends PigTestBase
             Assert.assertEquals(t.get(1), "daddy?");
         }
         pig.setBatchOn();
-        pig.registerQuery("compo_int_rows = LOAD 
'cassandra://thrift_ks/compo_int?" + defaultParameters + "' using 
CassandraStorage();");
-        pig.registerQuery("STORE compo_int_rows INTO 
'cassandra://thrift_ks/compo_int_copy?" + defaultParameters + "' using 
CassandraStorage();");
+        pig.registerQuery("compo_int_rows = LOAD 'cql://thrift_ks/compo_int?" 
+ defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20compo_int%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+        pig.registerQuery("STORE compo_int_rows INTO 
'cql://thrift_ks/compo_int_copy?" + defaultParameters + nativeParameters + 
"&output_query=UPDATE+thrift_ks.compo_int_copy+set+column1+%3D+%3F,+column2+%3D+%3F,+value+%3D+%3F'
 using CqlNativeStorage();");
         pig.executeBatch();
-        pig.registerQuery("compocopy_int_rows = LOAD 
'cassandra://thrift_ks/compo_int_copy?" + defaultParameters + "' using 
CassandraStorage();");
+        pig.registerQuery("compocopy_int_rows = LOAD 
'cql://thrift_ks/compo_int_copy?" + defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20compo_int_copy%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
         //(clock,{((1,0),z),((1,30),zzzz),((2,30),daddy?),((6,30),coffee...)})
         it = pig.openIterator("compocopy_int_rows");
         count = 0;
@@ -627,32 +546,26 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageCompositeKeyCF() throws IOException
+    public void testCqlNativeStorageCompositeKeyCF() throws IOException
     {
         //Test CompositeKey
-        pig.registerQuery("compokeys = load 'cassandra://thrift_ks/compo_key?" 
+ defaultParameters + "' using CassandraStorage();");
-        pig.registerQuery("compokeys = filter compokeys by key.$1 == 40;");
-        //((clock,40),{(6,coffee...)})
+        pig.registerQuery("compokeys = load 'cql://thrift_ks/compo_key?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20compo_key%20where%20token(key,column1)%20%3E%20%3F%20and%20token(key,column1)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+        pig.registerQuery("compokeys = filter compokeys by column1 == 40;");
+        //(clock,40,6,coffee...)
         Iterator<Tuple> it = pig.openIterator("compokeys");
         if (it.hasNext()) {
             Tuple t = it.next();
-            Tuple key = (Tuple) t.get(0); 
-            Assert.assertEquals(key.get(0), "clock");
-            Assert.assertEquals(key.get(1), 40L);
-            DataBag columns = (DataBag) t.get(1);
-            Iterator<Tuple> iter = columns.iterator();
-            if (iter.hasNext())
-            {
-                Tuple t1 = iter.next();
-                Assert.assertEquals(t1.get(0), 6L);
-                Assert.assertEquals(t1.get(1), "coffee...");
-            }
+            Assert.assertEquals(t.get(0), "clock");
+            Assert.assertEquals(t.get(1), 40L);
+            Assert.assertEquals(t.get(2), 6L);
+            Assert.assertEquals(t.get(3), "coffee...");
         }
         pig.setBatchOn();
-        pig.registerQuery("compo_key_rows = LOAD 
'cassandra://thrift_ks/compo_key?" + defaultParameters + "' using 
CassandraStorage();");
-        pig.registerQuery("STORE compo_key_rows INTO 
'cassandra://thrift_ks/compo_key_copy?" + defaultParameters + "' using 
CassandraStorage();");
+        pig.registerQuery("compo_key_rows = LOAD 'cql://thrift_ks/compo_key?" 
+ defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20compo_key%20where%20token(key,column1)%20%3E%20%3F%20and%20token(key,column1)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+        pig.registerQuery("compo_key_rows = FOREACH compo_key_rows GENERATE 
TOTUPLE(TOTUPLE('key',key),TOTUPLE('column1',column1),TOTUPLE('column2',column2)),TOTUPLE(value);");
+        pig.registerQuery("STORE compo_key_rows INTO 
'cql://thrift_ks/compo_key_copy?" + defaultParameters + nativeParameters + 
"&output_query=UPDATE+thrift_ks.compo_key_copy+set+value+%3D+%3F' using 
CqlNativeStorage();");
         pig.executeBatch();
-        pig.registerQuery("compo_key_copy_rows = LOAD 
'cassandra://thrift_ks/compo_key_copy?" + defaultParameters + "' using 
CassandraStorage();");
+        pig.registerQuery("compo_key_copy_rows = LOAD 
'cql://thrift_ks/compo_key_copy?" + defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20compo_key_copy%20where%20token(key,column1)%20%3E%20%3F%20and%20token(key,column1)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
         //((clock,10),{(1,z)})
         //((clock,20),{(1,zzzz)})
         //((clock,30),{(2,daddy?)})
@@ -662,66 +575,43 @@ public class ThriftColumnFamilyTest extends PigTestBase
         while (it.hasNext()) {
             Tuple t = it.next();
             count ++;
-            Tuple key = (Tuple) t.get(0); 
-            if ("clock".equals(key.get(0)) && (Long) key.get(1) == 10L)
+            if ("clock".equals(t.get(0)) && (Long) t.get(1) == 10L)
             {
-                DataBag columns = (DataBag) t.get(1);
-                Iterator<Tuple> iter = columns.iterator();
-                if (iter.hasNext())
-                {
-                    Tuple t1 = iter.next();
-                    Assert.assertEquals(t1.get(0), 1L);
-                    Assert.assertEquals(t1.get(1), "z");
-                }
+                Assert.assertEquals(t.get(2), 1L);
+                Assert.assertEquals(t.get(3), "z");
             }
-            else if ("clock".equals(key.get(0)) && (Long) key.get(1) == 40L)
+            else if ("clock".equals(t.get(0)) && (Long) t.get(1) == 40L)
             {
-                DataBag columns = (DataBag) t.get(1);
-                Iterator<Tuple> iter = columns.iterator();
-                if (iter.hasNext())
-                {
-                    Tuple t1 = iter.next();
-                    Assert.assertEquals(t1.get(0), 6L);
-                    Assert.assertEquals(t1.get(1), "coffee...");
-                }
+                Assert.assertEquals(t.get(2), 6L);
+                Assert.assertEquals(t.get(3), "coffee...");
             }
-            else if ("clock".equals(key.get(0)) && (Long) key.get(1) == 20L)
+            else if ("clock".equals(t.get(0)) && (Long) t.get(1) == 20L)
             {
-                DataBag columns = (DataBag) t.get(1);
-                Iterator<Tuple> iter = columns.iterator();
-                if (iter.hasNext())
-                {
-                    Tuple t1 = iter.next();
-                    Assert.assertEquals(t1.get(0), 1L);
-                    Assert.assertEquals(t1.get(1), "zzzz");
-                }
+                Assert.assertEquals(t.get(2), 1L);
+                Assert.assertEquals(t.get(3), "zzzz");
             }
-            else if ("clock".equals(key.get(0)) && (Long) key.get(1) == 30L)
+            else if ("clock".equals(t.get(0)) && (Long) t.get(1) == 30L)
             {
-                DataBag columns = (DataBag) t.get(1);
-                Iterator<Tuple> iter = columns.iterator();
-                if (iter.hasNext())
-                {
-                    Tuple t1 = iter.next();
-                    Assert.assertEquals(t1.get(0), 2L);
-                    Assert.assertEquals(t1.get(1), "daddy?");
-                }
+                Assert.assertEquals(t.get(2), 2L);
+                Assert.assertEquals(t.get(3), "daddy?");
             }
         }
         Assert.assertEquals(4, count);
     }
 
-    private String getColumnValue(String ks, String cf, String colName, String 
key, String validator) throws TException, IOException
+    private String getColumnValue(String ks, String cf, String colName, String 
key, String validator) throws IOException
     {
-        Cassandra.Client client = getClient();
-        client.set_keyspace(ks);
+        Session client = getClient();
+        client.execute("USE " + ks);
+
+        String query = String.format("SELECT %s FROM %s WHERE key = '%s'", 
colName, cf, key);
+
+        ResultSet rows = client.execute(query);
+        Row row = rows.one();
 
-        ByteBuffer key_user_id = ByteBufferUtil.bytes(key);
-        ColumnPath cp = new ColumnPath(cf);
-        cp.column = ByteBufferUtil.bytes(colName);
+        if (row == null || row.isNull(0))
+            return null;
 
-        // read
-        ColumnOrSuperColumn got = client.get(key_user_id, cp, 
ConsistencyLevel.ONE);
-        return parseType(validator).getString(got.getColumn().value);
+        return parseType(validator).getString(row.getBytesUnsafe(0));
     }
 }

Reply via email to