Repository: cassandra Updated Branches: refs/heads/trunk d62cd1bc9 -> 446e25378
http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java index 60d04d3..93ac5be 100644 --- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java +++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java @@ -22,24 +22,21 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; -import org.apache.cassandra.thrift.ColumnPath; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.NotFoundException; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; -import org.apache.thrift.TException; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; public class ThriftColumnFamilyTest extends PigTestBase -{ +{ private static String[] statements = { "DROP KEYSPACE IF EXISTS thrift_ks", "CREATE KEYSPACE thrift_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};", @@ -171,7 +168,7 @@ public class ThriftColumnFamilyTest extends PigTestBase }; @BeforeClass - public static void setup() throws IOException, ConfigurationException, TException + public static void setup() throws IOException, ConfigurationException { startCassandra(); executeCQLStatements(statements); @@ -300,11 +297,11 @@ public class ThriftColumnFamilyTest extends PigTestBase } @Test - public void testCassandraStorageSchema() throws IOException + public void testCqlNativeStorageSchema() throws IOException { //results: (qux,(atomic_weight,0.660161815846869),(created,1335890877),(name,User Qux),(percent,64.7), //(rating,2),(score,12000),(vote_type,dislike)) - pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters + "' USING CqlNativeStorage();"); //schema: {key: chararray,atomic_weight: (name: chararray,value: double),created: (name: chararray,value: long), //name: (name: chararray,value: chararray),percent: (name: chararray,value: float), @@ -339,12 +336,13 @@ public class ThriftColumnFamilyTest extends PigTestBase } @Test - public void testCassandraStorageFullCopy() throws IOException, TException + public void testCqlNativeStorageFullCopy() throws IOException { pig.setBatchOn(); - pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + pig.registerQuery("records = FOREACH rows GENERATE TOTUPLE(TOTUPLE('key', key)),TOTUPLE(atomic_weight, created, name, percent, rating, score, vote_type);"); //full copy - pig.registerQuery("STORE rows INTO 'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("STORE records INTO 'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + "&output_query=UPDATE+thrift_ks.copy_of_some_app+set+atomic_weight+%3D+%3F,+created+%3D+%3F,+name+%3D+%3F,+percent+%3D+%3F,+rating+%3D+%3F,+score+%3D+%3F,+vote_type+%3D+%3F' USING CqlNativeStorage();"); pig.executeBatch(); Assert.assertEquals("User Qux", getColumnValue("thrift_ks", "copy_of_some_app", "name", "qux", "UTF8Type")); Assert.assertEquals("dislike", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type")); @@ -352,158 +350,94 @@ public class ThriftColumnFamilyTest extends PigTestBase } @Test - public void testCassandraStorageSingleTupleCopy() throws IOException, TException + public void testCqlNativeStorageSingleTupleCopy() throws IOException { executeCQLStatements(deleteCopyOfSomeAppTableData); pig.setBatchOn(); - pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();"); //single tuple - pig.registerQuery("onecol = FOREACH rows GENERATE key, percent;"); - pig.registerQuery("STORE onecol INTO 'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("onecol = FOREACH rows GENERATE TOTUPLE(TOTUPLE('key', key)), TOTUPLE(percent);"); + pig.registerQuery("STORE onecol INTO 'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + "&output_query=UPDATE+thrift_ks.copy_of_some_app+set+percent+%3D+%3F' USING CqlNativeStorage();"); pig.executeBatch(); - String value = null; - try - { - value = getColumnValue("thrift_ks", "copy_of_some_app", "name", "qux", "UTF8Type"); - } - catch (NotFoundException e) - { - Assert.assertTrue(true); - } + String value = getColumnValue("thrift_ks", "copy_of_some_app", "name", "qux", "UTF8Type"); if (value != null) Assert.fail(); - try - { - value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type"); - } - catch (NotFoundException e) - { - Assert.assertTrue(true); - } + value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type"); if (value != null) Assert.fail(); Assert.assertEquals("64.7", getColumnValue("thrift_ks", "copy_of_some_app", "percent", "qux", "FloatType")); } @Test - public void testCassandraStorageBagOnlyCopy() throws IOException, TException + public void testCqlNativeStorageBagOnlyCopy() throws IOException { executeCQLStatements(deleteCopyOfSomeAppTableData); pig.setBatchOn(); - pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();"); //bag only - pig.registerQuery("other = FOREACH rows GENERATE key, columns;"); - pig.registerQuery("STORE other INTO 'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("other = FOREACH rows GENERATE TOTUPLE(TOTUPLE('key', key)), TOTUPLE();"); + pig.registerQuery("STORE other INTO 'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + "' USING CqlNativeStorage();"); pig.executeBatch(); - String value = null; - try - { - value = getColumnValue("thrift_ks", "copy_of_some_app", "name", "qux", "UTF8Type"); - } - catch (NotFoundException e) - { - Assert.assertTrue(true); - } + String value = getColumnValue("thrift_ks", "copy_of_some_app", "name", "qux", "UTF8Type"); if (value != null) Assert.fail(); - try - { - value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type"); - } - catch (NotFoundException e) - { - Assert.assertTrue(true); - } + value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type"); if (value != null) Assert.fail(); - try - { - value = getColumnValue("thrift_ks", "copy_of_some_app", "percent", "qux", "FloatType"); - } - catch (NotFoundException e) - { - Assert.assertTrue(true); - } + value = getColumnValue("thrift_ks", "copy_of_some_app", "percent", "qux", "FloatType"); if (value != null) Assert.fail(); } @Test - public void testCassandraStorageFilter() throws IOException, TException + public void testCqlNativeStorageFilter() throws IOException { executeCQLStatements(deleteCopyOfSomeAppTableData); pig.setBatchOn(); - pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();"); //filter - pig.registerQuery("likes = FILTER rows by vote_type.value eq 'like' and rating.value > 5;"); - pig.registerQuery("STORE likes INTO 'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("likes = FILTER rows by vote_type eq 'like' and rating > 5;"); + pig.registerQuery("records = FOREACH likes GENERATE TOTUPLE(TOTUPLE('key', key)),TOTUPLE(atomic_weight, created, name, percent, rating, score, vote_type);"); + pig.registerQuery("STORE records INTO 'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + "&output_query=UPDATE+thrift_ks.copy_of_some_app+set+atomic_weight+%3D+%3F,+created+%3D+%3F,+name+%3D+%3F,+percent+%3D+%3F,+rating+%3D+%3F,+score+%3D+%3F,+vote_type+%3D+%3F' USING CqlNativeStorage();"); pig.executeBatch(); Assert.assertEquals("like", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "bar", "UTF8Type")); Assert.assertEquals("like", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "foo", "UTF8Type")); - String value = null; - try - { - value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type"); - } - catch (NotFoundException e) - { - Assert.assertTrue(true); - } + String value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type"); if (value != null) Assert.fail(); - try - { - value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "baz", "UTF8Type"); - } - catch (NotFoundException e) - { - Assert.assertTrue(true); - } + value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "baz", "UTF8Type"); + if (value != null) Assert.fail(); executeCQLStatements(deleteCopyOfSomeAppTableData); pig.setBatchOn(); - pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters + "' USING CassandraStorage();"); - pig.registerQuery("dislikes_extras = FILTER rows by vote_type.value eq 'dislike';"); - pig.registerQuery("STORE dislikes_extras INTO 'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters + "' USING CassandraStorage();"); + pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + pig.registerQuery("dislikes_extras = FILTER rows by vote_type eq 'dislike';"); + pig.registerQuery("dislikes_records = FOREACH dislikes_extras GENERATE TOTUPLE(TOTUPLE('key', key)),TOTUPLE(atomic_weight, created, name, percent, rating, score, vote_type);"); + pig.registerQuery("STORE dislikes_records INTO 'cql://thrift_ks/copy_of_some_app?" + defaultParameters + nativeParameters + "&output_query=UPDATE+thrift_ks.copy_of_some_app+set+atomic_weight+%3D+%3F,+created+%3D+%3F,+name+%3D+%3F,+percent+%3D+%3F,+rating+%3D+%3F,+score+%3D+%3F,+vote_type+%3D+%3F' USING CqlNativeStorage();"); pig.executeBatch(); Assert.assertEquals("dislike", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "baz", "UTF8Type")); Assert.assertEquals("dislike", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type")); - value = null; - try - { - value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "bar", "UTF8Type"); - } - catch (NotFoundException e) - { - Assert.assertTrue(true); - } + value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "bar", "UTF8Type"); if (value != null) Assert.fail(); - try - { - value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "foo", "UTF8Type"); - } - catch (NotFoundException e) - { - Assert.assertTrue(true); - } + value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "foo", "UTF8Type"); if (value != null) Assert.fail(); } @Test - public void testCassandraStorageJoin() throws IOException + public void testCqlNativeStorageJoin() throws IOException { //test key types with a join - pig.registerQuery("U8 = load 'cassandra://thrift_ks/u8?" + defaultParameters + "' using CassandraStorage();"); - pig.registerQuery("Bytes = load 'cassandra://thrift_ks/bytes?" + defaultParameters + "' using CassandraStorage();"); + pig.registerQuery("U8 = load 'cql://thrift_ks/u8?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20u8%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); + pig.registerQuery("Bytes = load 'cql://thrift_ks/bytes?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20bytes%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); //cast key to chararray - pig.registerQuery("b = foreach Bytes generate (chararray)key, columns;"); + pig.registerQuery("b = foreach Bytes generate (chararray)key, column1, value;"); //key in Bytes is a bytearray, U8 chararray //(foo,{(x,Z)},foo,{(x,Z)}) @@ -512,18 +446,11 @@ public class ThriftColumnFamilyTest extends PigTestBase if (it.hasNext()) { Tuple t = it.next(); Assert.assertEquals(t.get(0), new DataByteArray("foo".getBytes())); - DataBag columns = (DataBag) t.get(1); - Iterator<Tuple> iter = columns.iterator(); - Tuple t1 = iter.next(); - Assert.assertEquals(t1.get(0), "x"); - Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes())); - String column = (String) t.get(2); - Assert.assertEquals(column, "foo"); - columns = (DataBag) t.get(3); - iter = columns.iterator(); - Tuple t2 = iter.next(); - Assert.assertEquals(t2.get(0), "x"); - Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes())); + Assert.assertEquals(t.get(1), "x"); + Assert.assertEquals(t.get(2), new DataByteArray("Z".getBytes())); + Assert.assertEquals(t.get(3), "foo"); + Assert.assertEquals(t.get(4), "x"); + Assert.assertEquals(t.get(5), new DataByteArray("Z".getBytes())); } //key should now be cast into a chararray //(foo,{(x,Z)},foo,{(x,Z)}) @@ -532,27 +459,22 @@ public class ThriftColumnFamilyTest extends PigTestBase if (it.hasNext()) { Tuple t = it.next(); Assert.assertEquals(t.get(0), "foo"); - DataBag columns = (DataBag) t.get(1); - Iterator<Tuple> iter = columns.iterator(); - Tuple t1 = iter.next(); - Assert.assertEquals(t1.get(0), "x"); - Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes())); - String column = (String) t.get(2); - Assert.assertEquals(column, "foo"); - columns = (DataBag) t.get(3); - iter = columns.iterator(); - Tuple t2 = iter.next(); - Assert.assertEquals(t2.get(0), "x"); - Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes())); + Assert.assertEquals(t.get(1), "x"); + Assert.assertEquals(t.get(2), new DataByteArray("Z".getBytes())); + Assert.assertEquals(t.get(3), "foo"); + Assert.assertEquals(t.get(4), "x"); + Assert.assertEquals(t.get(5), new DataByteArray("Z".getBytes())); } } @Test - public void testCassandraStorageCounterCF() throws IOException + public void testCqlNativeStorageCounterCF() throws IOException { //Test counter column family support - pig.registerQuery("CC = load 'cassandra://thrift_ks/cc?" + defaultParameters + "' using CassandraStorage();"); - pig.registerQuery("total_hits = foreach CC generate key, SUM(columns.value);"); + pig.registerQuery("CC = load 'cql://thrift_ks/cc?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20cc%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); + pig.registerQuery("A = foreach CC generate key, name, value;"); + pig.registerQuery("B = GROUP A BY key;"); + pig.registerQuery("total_hits = foreach B generate group, SUM(A.value);"); //(chuck,4) Tuple t = pig.openIterator("total_hits").next(); Assert.assertEquals(t.get(0), "chuck"); @@ -560,12 +482,11 @@ public class ThriftColumnFamilyTest extends PigTestBase } @Test - public void testCassandraStorageCompositeColumnCF() throws IOException + public void testCqlNativeStorageCompositeColumnCF() throws IOException { //Test CompositeType - pig.registerQuery("compo = load 'cassandra://thrift_ks/compo?" + defaultParameters + "' using CassandraStorage();"); - pig.registerQuery("compo = foreach compo generate key as method, flatten(columns);"); - pig.registerQuery("lee = filter compo by columns::name == ('bruce','lee');"); + pig.registerQuery("compo = load 'cql://thrift_ks/compo?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compo%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); + pig.registerQuery("lee = filter compo by column1 == 'bruce' AND column2 == 'lee';"); //(kick,(bruce,lee),oww) //(punch,(bruce,lee),ouch) @@ -574,18 +495,16 @@ public class ThriftColumnFamilyTest extends PigTestBase while (it.hasNext()) { count ++; Tuple t = it.next(); - Tuple t1 = (Tuple) t.get(1); - Assert.assertEquals(t1.get(0), "bruce"); - Assert.assertEquals(t1.get(1), "lee"); + Assert.assertEquals(t.get(1), "bruce"); + Assert.assertEquals(t.get(2), "lee"); if ("kick".equals(t.get(0))) - Assert.assertEquals(t.get(2), "oww"); - else if ("kick".equals(t.get(0))) - Assert.assertEquals(t.get(2), "ouch"); + Assert.assertEquals(t.get(3), "oww"); + else + Assert.assertEquals(t.get(3), "ouch"); } Assert.assertEquals(count, 2); - pig.registerQuery("night = load 'cassandra://thrift_ks/compo_int?" + defaultParameters + "' using CassandraStorage();"); - pig.registerQuery("night = foreach night generate flatten(columns);"); - pig.registerQuery("night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60 as hour, columns::value as noise;"); + pig.registerQuery("night = load 'cql://thrift_ks/compo_int?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compo_int%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); + pig.registerQuery("night = foreach night generate (int)column1+(double)column2/60 as hour, value as noise;"); //What happens at the darkest hour? pig.registerQuery("darkest = filter night by hour > 2 and hour < 5;"); @@ -598,10 +517,10 @@ public class ThriftColumnFamilyTest extends PigTestBase Assert.assertEquals(t.get(1), "daddy?"); } pig.setBatchOn(); - pig.registerQuery("compo_int_rows = LOAD 'cassandra://thrift_ks/compo_int?" + defaultParameters + "' using CassandraStorage();"); - pig.registerQuery("STORE compo_int_rows INTO 'cassandra://thrift_ks/compo_int_copy?" + defaultParameters + "' using CassandraStorage();"); + pig.registerQuery("compo_int_rows = LOAD 'cql://thrift_ks/compo_int?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compo_int%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); + pig.registerQuery("STORE compo_int_rows INTO 'cql://thrift_ks/compo_int_copy?" + defaultParameters + nativeParameters + "&output_query=UPDATE+thrift_ks.compo_int_copy+set+column1+%3D+%3F,+column2+%3D+%3F,+value+%3D+%3F' using CqlNativeStorage();"); pig.executeBatch(); - pig.registerQuery("compocopy_int_rows = LOAD 'cassandra://thrift_ks/compo_int_copy?" + defaultParameters + "' using CassandraStorage();"); + pig.registerQuery("compocopy_int_rows = LOAD 'cql://thrift_ks/compo_int_copy?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compo_int_copy%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); //(clock,{((1,0),z),((1,30),zzzz),((2,30),daddy?),((6,30),coffee...)}) it = pig.openIterator("compocopy_int_rows"); count = 0; @@ -627,32 +546,26 @@ public class ThriftColumnFamilyTest extends PigTestBase } @Test - public void testCassandraStorageCompositeKeyCF() throws IOException + public void testCqlNativeStorageCompositeKeyCF() throws IOException { //Test CompositeKey - pig.registerQuery("compokeys = load 'cassandra://thrift_ks/compo_key?" + defaultParameters + "' using CassandraStorage();"); - pig.registerQuery("compokeys = filter compokeys by key.$1 == 40;"); - //((clock,40),{(6,coffee...)}) + pig.registerQuery("compokeys = load 'cql://thrift_ks/compo_key?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compo_key%20where%20token(key,column1)%20%3E%20%3F%20and%20token(key,column1)%20%3C%3D%20%3F' using CqlNativeStorage();"); + pig.registerQuery("compokeys = filter compokeys by column1 == 40;"); + //(clock,40,6,coffee...) Iterator<Tuple> it = pig.openIterator("compokeys"); if (it.hasNext()) { Tuple t = it.next(); - Tuple key = (Tuple) t.get(0); - Assert.assertEquals(key.get(0), "clock"); - Assert.assertEquals(key.get(1), 40L); - DataBag columns = (DataBag) t.get(1); - Iterator<Tuple> iter = columns.iterator(); - if (iter.hasNext()) - { - Tuple t1 = iter.next(); - Assert.assertEquals(t1.get(0), 6L); - Assert.assertEquals(t1.get(1), "coffee..."); - } + Assert.assertEquals(t.get(0), "clock"); + Assert.assertEquals(t.get(1), 40L); + Assert.assertEquals(t.get(2), 6L); + Assert.assertEquals(t.get(3), "coffee..."); } pig.setBatchOn(); - pig.registerQuery("compo_key_rows = LOAD 'cassandra://thrift_ks/compo_key?" + defaultParameters + "' using CassandraStorage();"); - pig.registerQuery("STORE compo_key_rows INTO 'cassandra://thrift_ks/compo_key_copy?" + defaultParameters + "' using CassandraStorage();"); + pig.registerQuery("compo_key_rows = LOAD 'cql://thrift_ks/compo_key?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compo_key%20where%20token(key,column1)%20%3E%20%3F%20and%20token(key,column1)%20%3C%3D%20%3F' using CqlNativeStorage();"); + pig.registerQuery("compo_key_rows = FOREACH compo_key_rows GENERATE TOTUPLE(TOTUPLE('key',key),TOTUPLE('column1',column1),TOTUPLE('column2',column2)),TOTUPLE(value);"); + pig.registerQuery("STORE compo_key_rows INTO 'cql://thrift_ks/compo_key_copy?" + defaultParameters + nativeParameters + "&output_query=UPDATE+thrift_ks.compo_key_copy+set+value+%3D+%3F' using CqlNativeStorage();"); pig.executeBatch(); - pig.registerQuery("compo_key_copy_rows = LOAD 'cassandra://thrift_ks/compo_key_copy?" + defaultParameters + "' using CassandraStorage();"); + pig.registerQuery("compo_key_copy_rows = LOAD 'cql://thrift_ks/compo_key_copy?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compo_key_copy%20where%20token(key,column1)%20%3E%20%3F%20and%20token(key,column1)%20%3C%3D%20%3F' using CqlNativeStorage();"); //((clock,10),{(1,z)}) //((clock,20),{(1,zzzz)}) //((clock,30),{(2,daddy?)}) @@ -662,66 +575,43 @@ public class ThriftColumnFamilyTest extends PigTestBase while (it.hasNext()) { Tuple t = it.next(); count ++; - Tuple key = (Tuple) t.get(0); - if ("clock".equals(key.get(0)) && (Long) key.get(1) == 10L) + if ("clock".equals(t.get(0)) && (Long) t.get(1) == 10L) { - DataBag columns = (DataBag) t.get(1); - Iterator<Tuple> iter = columns.iterator(); - if (iter.hasNext()) - { - Tuple t1 = iter.next(); - Assert.assertEquals(t1.get(0), 1L); - Assert.assertEquals(t1.get(1), "z"); - } + Assert.assertEquals(t.get(2), 1L); + Assert.assertEquals(t.get(3), "z"); } - else if ("clock".equals(key.get(0)) && (Long) key.get(1) == 40L) + else if ("clock".equals(t.get(0)) && (Long) t.get(1) == 40L) { - DataBag columns = (DataBag) t.get(1); - Iterator<Tuple> iter = columns.iterator(); - if (iter.hasNext()) - { - Tuple t1 = iter.next(); - Assert.assertEquals(t1.get(0), 6L); - Assert.assertEquals(t1.get(1), "coffee..."); - } + Assert.assertEquals(t.get(2), 6L); + Assert.assertEquals(t.get(3), "coffee..."); } - else if ("clock".equals(key.get(0)) && (Long) key.get(1) == 20L) + else if ("clock".equals(t.get(0)) && (Long) t.get(1) == 20L) { - DataBag columns = (DataBag) t.get(1); - Iterator<Tuple> iter = columns.iterator(); - if (iter.hasNext()) - { - Tuple t1 = iter.next(); - Assert.assertEquals(t1.get(0), 1L); - Assert.assertEquals(t1.get(1), "zzzz"); - } + Assert.assertEquals(t.get(2), 1L); + Assert.assertEquals(t.get(3), "zzzz"); } - else if ("clock".equals(key.get(0)) && (Long) key.get(1) == 30L) + else if ("clock".equals(t.get(0)) && (Long) t.get(1) == 30L) { - DataBag columns = (DataBag) t.get(1); - Iterator<Tuple> iter = columns.iterator(); - if (iter.hasNext()) - { - Tuple t1 = iter.next(); - Assert.assertEquals(t1.get(0), 2L); - Assert.assertEquals(t1.get(1), "daddy?"); - } + Assert.assertEquals(t.get(2), 2L); + Assert.assertEquals(t.get(3), "daddy?"); } } Assert.assertEquals(4, count); } - private String getColumnValue(String ks, String cf, String colName, String key, String validator) throws TException, IOException + private String getColumnValue(String ks, String cf, String colName, String key, String validator) throws IOException { - Cassandra.Client client = getClient(); - client.set_keyspace(ks); + Session client = getClient(); + client.execute("USE " + ks); + + String query = String.format("SELECT %s FROM %s WHERE key = '%s'", colName, cf, key); + + ResultSet rows = client.execute(query); + Row row = rows.one(); - ByteBuffer key_user_id = ByteBufferUtil.bytes(key); - ColumnPath cp = new ColumnPath(cf); - cp.column = ByteBufferUtil.bytes(colName); + if (row == null || row.isNull(0)) + return null; - // read - ColumnOrSuperColumn got = client.get(key_user_id, cp, ConsistencyLevel.ONE); - return parseType(validator).getString(got.getColumn().value); + return parseType(validator).getString(row.getBytesUnsafe(0)); } }