Pig support for hadoop CqlInputFormat Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6454
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e2a5b0e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e2a5b0e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e2a5b0e Branch: refs/heads/trunk Commit: 1e2a5b0ee334327f1ed62e181542328a0824c27c Parents: ef894c2 Author: Brandon Williams <brandonwilli...@apache.org> Authored: Wed Jul 23 10:26:50 2014 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Wed Jul 23 10:26:50 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + build.xml | 1 + .../cassandra/hadoop/cql3/CqlConfigHelper.java | 1 - .../cassandra/hadoop/cql3/CqlInputFormat.java | 2 - .../cassandra/hadoop/pig/CqlNativeStorage.java | 289 +++++++++++++++++++ .../apache/cassandra/hadoop/pig/CqlStorage.java | 12 +- test/conf/cassandra.yaml | 2 + .../cassandra/pig/CqlTableDataTypeTest.java | 94 +++++- .../org/apache/cassandra/pig/CqlTableTest.java | 101 ++++++- .../org/apache/cassandra/pig/PigTestBase.java | 3 + .../cassandra/pig/ThriftColumnFamilyTest.java | 73 +++-- 11 files changed, 529 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9909760..6f67720 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.10 + * Pig support for hadoop CqlInputFormat (CASSANDRA-6454) * Fix ReversedType(DateType) mapping to native protocol (CASSANDRA-7576) * (Windows) force range-based repair to non-sequential mode (CASSANDRA-7541) * Fix range merging when DES scores are zero (CASSANDRA-7535) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index ddf74e4..68907df 100644 --- a/build.xml +++ b/build.xml @@ -1275,6 +1275,7 @@ <classpathentry kind="src" path="interface/thrift/gen-java"/> <classpathentry kind="src" path="test/unit"/> <classpathentry kind="src" path="test/long"/> + <classpathentry kind="src" path="test/pig"/> <classpathentry kind="src" path="tools/stress/src"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry kind="output" path="build/classes/main"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index 63279d1..b375ce2 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -20,7 +20,6 @@ package org.apache.cassandra.hadoop.cql3; * */ import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.security.KeyManagementException; import java.security.KeyStore; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java index e1cdf32..09bd80c 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@ -18,8 +18,6 @@ package org.apache.cassandra.hadoop.cql3; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat; import org.apache.hadoop.mapred.InputSplit; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java new file mode 100644 index 0000000..948d21c --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java @@ -0,0 +1,289 @@ +package org.apache.cassandra.hadoop.pig; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Map; + +import org.apache.cassandra.db.Column; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; +import org.apache.cassandra.thrift.CfDef; +import org.apache.cassandra.thrift.ColumnDef; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; + +import com.datastax.driver.core.Row; + +public class CqlNativeStorage extends CqlStorage +{ + private RecordReader<Long, Row> reader; + private String nativePort; + private String nativeCoreConnections; + private String nativeMaxConnections; + private String nativeMinSimultReqs; + private String nativeMaxSimultReqs; + private String nativeConnectionTimeout; + private String nativeReadConnectionTimeout; + private String nativeReceiveBufferSize; + private String nativeSendBufferSize; + private String nativeSolinger; + private String nativeTcpNodelay; + private String nativeReuseAddress; + private String nativeKeepAlive; + private String nativeAuthProvider; + private String nativeSSLTruststorePath; + private String nativeSSLKeystorePath; + private String nativeSSLTruststorePassword; + private String nativeSSLKeystorePassword; + private String nativeSSLCipherSuites; + private String inputCql; + + public CqlNativeStorage() + { + this(1000); + } + + /** @param pageSize limit number of CQL rows to fetch in a thrift request */ + public CqlNativeStorage(int pageSize) + { + super(pageSize); + DEFAULT_INPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlInputFormat"; + } + + public void prepareToRead(RecordReader reader, PigSplit split) + { + this.reader = reader; + } + + /** get next row */ + public Tuple getNext() throws IOException + { + try + { + // load the next pair + if (!reader.nextKeyValue()) + return null; + + CfInfo cfInfo = getCfInfo(loadSignature); + CfDef cfDef = cfInfo.cfDef; + Row row = reader.getCurrentValue(); + Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size()); + Iterator<ColumnDef> itera = cfDef.column_metadata.iterator(); + int i = 0; + while (itera.hasNext()) + { + ColumnDef cdef = itera.next(); + ByteBuffer columnValue = row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate())); + if (columnValue != null) + { + Column column = new Column(cdef.name, columnValue); + AbstractType<?> validator = getValidatorMap(cfDef).get(column.name()); + setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), validator); + } + else + tuple.set(i, null); + i++; + } + return tuple; + } + catch (InterruptedException e) + { + throw new IOException(e.getMessage()); + } + } + + /** set read configuration settings */ + public void setLocation(String location, Job job) throws IOException + { + conf = job.getConfiguration(); + setLocationFromUri(location); + + if (username != null && password != null) + { + ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password); + CqlConfigHelper.setUserNameAndPassword(conf, username, password); + } + if (splitSize > 0) + ConfigHelper.setInputSplitSize(conf, splitSize); + if (partitionerClass!= null) + ConfigHelper.setInputPartitioner(conf, partitionerClass); + if (initHostAddress != null) + ConfigHelper.setInputInitialAddress(conf, initHostAddress); + if (rpcPort != null) + ConfigHelper.setInputRpcPort(conf, rpcPort); + if (nativePort != null) + CqlConfigHelper.setInputNativePort(conf, nativePort); + if (nativeCoreConnections != null) + CqlConfigHelper.setInputCoreConnections(conf, nativeCoreConnections); + if (nativeMaxConnections != null) + CqlConfigHelper.setInputMaxConnections(conf, nativeMaxConnections); + if (nativeMinSimultReqs != null) + CqlConfigHelper.setInputMinSimultReqPerConnections(conf, nativeMinSimultReqs); + if (nativeMinSimultReqs != null) + CqlConfigHelper.setInputMaxSimultReqPerConnections(conf, nativeMaxSimultReqs); + if (nativeConnectionTimeout != null) + CqlConfigHelper.setInputNativeConnectionTimeout(conf, nativeConnectionTimeout); + if (nativeReadConnectionTimeout != null) + CqlConfigHelper.setInputNativeReadConnectionTimeout(conf, nativeReadConnectionTimeout); + if (nativeReceiveBufferSize != null) + CqlConfigHelper.setInputNativeReceiveBufferSize(conf, nativeReceiveBufferSize); + if (nativeSendBufferSize != null) + CqlConfigHelper.setInputNativeSendBufferSize(conf, nativeSendBufferSize); + if (nativeSolinger != null) + CqlConfigHelper.setInputNativeSolinger(conf, nativeSolinger); + if (nativeTcpNodelay != null) + CqlConfigHelper.setInputNativeTcpNodelay(conf, nativeTcpNodelay); + if (nativeReuseAddress != null) + CqlConfigHelper.setInputNativeReuseAddress(conf, nativeReuseAddress); + if (nativeKeepAlive != null) + CqlConfigHelper.setInputNativeKeepAlive(conf, nativeKeepAlive); + if (nativeAuthProvider != null) + CqlConfigHelper.setInputNativeAuthProvider(conf, nativeAuthProvider); + if (nativeSSLTruststorePath != null) + CqlConfigHelper.setInputNativeSSLTruststorePath(conf, nativeSSLTruststorePath); + if (nativeSSLKeystorePath != null) + CqlConfigHelper.setInputNativeSSLKeystorePath(conf, nativeSSLKeystorePath); + if (nativeSSLTruststorePassword != null) + CqlConfigHelper.setInputNativeSSLTruststorePassword(conf, nativeSSLTruststorePassword); + if (nativeSSLKeystorePassword != null) + CqlConfigHelper.setInputNativeSSLKeystorePassword(conf, nativeSSLKeystorePassword); + if (nativeSSLCipherSuites != null) + CqlConfigHelper.setInputNativeSSLCipherSuites(conf, nativeSSLCipherSuites); + + ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); + setConnectionInformation(); + + CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize)); + CqlConfigHelper.setInputCql(conf, inputCql); + if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null) + { + try + { + ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE))); + } + catch (NumberFormatException e) + { + throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e); + } + } + + if (ConfigHelper.getInputInitialAddress(conf) == null) + throw new IOException("PIG_INPUT_INITIAL_ADDRESS or PIG_INITIAL_ADDRESS environment variable not set"); + if (ConfigHelper.getInputPartitioner(conf) == null) + throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER environment variable not set"); + if (loadSignature == null) + loadSignature = location; + + initSchema(loadSignature); + } + + private void setLocationFromUri(String location) throws IOException + { + try + { + if (!location.startsWith("cql://")) + throw new Exception("Bad scheme: " + location); + + String[] urlParts = location.split("\\?"); + if (urlParts.length > 1) + { + Map<String, String> urlQuery = getQueryMap(urlParts[1]); + + // each page row size + if (urlQuery.containsKey("page_size")) + pageSize = Integer.parseInt(urlQuery.get("page_size")); + + // output prepared statement + if (urlQuery.containsKey("output_query")) + outputQuery = urlQuery.get("output_query"); + + //split size + if (urlQuery.containsKey("split_size")) + splitSize = Integer.parseInt(urlQuery.get("split_size")); + if (urlQuery.containsKey("partitioner")) + partitionerClass = urlQuery.get("partitioner"); + if (urlQuery.containsKey("use_secondary")) + usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary")); + if (urlQuery.containsKey("init_address")) + initHostAddress = urlQuery.get("init_address"); + + if (urlQuery.containsKey("native_port")) + nativePort = urlQuery.get("native_port"); + if (urlQuery.containsKey("core_conns")) + nativeCoreConnections = urlQuery.get("core_conns"); + if (urlQuery.containsKey("max_conns")) + nativeMaxConnections = urlQuery.get("max_conns"); + if (urlQuery.containsKey("min_simult_reqs")) + nativeMinSimultReqs = urlQuery.get("min_simult_reqs"); + if (urlQuery.containsKey("max_simult_reqs")) + nativeMaxSimultReqs = urlQuery.get("max_simult_reqs"); + if (urlQuery.containsKey("native_timeout")) + nativeConnectionTimeout = urlQuery.get("native_timeout"); + if (urlQuery.containsKey("native_read_timeout")) + nativeReadConnectionTimeout = urlQuery.get("native_read_timeout"); + if (urlQuery.containsKey("rec_buff_size")) + nativeReceiveBufferSize = urlQuery.get("rec_buff_size"); + if (urlQuery.containsKey("send_buff_size")) + nativeSendBufferSize = urlQuery.get("send_buff_size"); + if (urlQuery.containsKey("solinger")) + nativeSolinger = urlQuery.get("solinger"); + if (urlQuery.containsKey("tcp_nodelay")) + nativeTcpNodelay = urlQuery.get("tcp_nodelay"); + if (urlQuery.containsKey("reuse_address")) + nativeReuseAddress = urlQuery.get("reuse_address"); + if (urlQuery.containsKey("keep_alive")) + nativeKeepAlive = urlQuery.get("keep_alive"); + if (urlQuery.containsKey("auth_provider")) + nativeAuthProvider = urlQuery.get("auth_provider"); + if (urlQuery.containsKey("trust_store_path")) + nativeSSLTruststorePath = urlQuery.get("trust_store_path"); + if (urlQuery.containsKey("key_store_path")) + nativeSSLKeystorePath = urlQuery.get("key_store_path"); + if (urlQuery.containsKey("trust_store_password")) + nativeSSLTruststorePassword = urlQuery.get("trust_store_password"); + if (urlQuery.containsKey("key_store_password")) + nativeSSLKeystorePassword = urlQuery.get("key_store_password"); + if (urlQuery.containsKey("cipher_suites")) + nativeSSLCipherSuites = urlQuery.get("cipher_suites"); + if (urlQuery.containsKey("input_cql")) + inputCql = urlQuery.get("input_cql"); + if (urlQuery.containsKey("rpc_port")) + rpcPort = urlQuery.get("rpc_port"); + } + String[] parts = urlParts[0].split("/+"); + String[] credentialsAndKeyspace = parts[1].split("@"); + if (credentialsAndKeyspace.length > 1) + { + String[] credentials = credentialsAndKeyspace[0].split(":"); + username = credentials[0]; + password = credentials[1]; + keyspace = credentialsAndKeyspace[1]; + } + else + { + keyspace = parts[1]; + } + column_family = parts[2]; + } + catch (Exception e) + { + throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" + + "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" + + "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]" + + "[&init_address=<host>][&native_port=<native_port>][&core_conns=<core_conns>]" + + "[&max_conns=<max_conns>][&min_simult_reqs=<min_simult_reqs>][&max_simult_reqs=<max_simult_reqs>]" + + "[&native_timeout=<native_timeout>][&native_read_timeout=<native_read_timeout>][&rec_buff_size=<rec_buff_size>]" + + "[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]" + + "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" + + "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" + + "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]': " + e.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index 284b72a..02a6d98 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; - import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.cassandra.cql3.CFDefinition; import org.apache.cassandra.db.Column; @@ -33,7 +32,6 @@ import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; - import org.apache.hadoop.mapreduce.*; import org.apache.pig.Expression; import org.apache.pig.Expression.OpType; @@ -58,11 +56,11 @@ public class CqlStorage extends AbstractCassandraStorage private static final Logger logger = LoggerFactory.getLogger(CqlStorage.class); private RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> reader; - private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer; + protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer; - private int pageSize = 1000; + protected int pageSize = 1000; private String columns; - private String outputQuery; + protected String outputQuery; private String whereClause; private boolean hasCompactValueAlias = false; @@ -130,7 +128,7 @@ public class CqlStorage extends AbstractCassandraStorage } /** set the value to the position of the tuple */ - private void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException + protected void setTupleValue(Tuple tuple, int position, Object value, AbstractType<?> validator) throws ExecException { if (validator instanceof CollectionType) setCollectionTupleValues(tuple, position, value, validator); @@ -184,7 +182,7 @@ public class CqlStorage extends AbstractCassandraStorage } /** convert a cql column to an object */ - private Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException + protected Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException { // standard Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index a207bc6..d92eba6 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -32,3 +32,5 @@ server_encryption_options: truststore_password: cassandra incremental_backups: true compaction_throughput_mb_per_sec: 0 +start_native_transport: true +native_transport_port: 9052 http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java index 2020b0a..1819c61 100644 --- a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java +++ b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java @@ -220,7 +220,24 @@ public class CqlTableDataTypeTest extends PigTestBase public void testCqlStorageRegularType() throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException { - pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();"); + cqlTableTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();"); + counterTableTest("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();"); + } + + @Test + public void testCqlNativeStorageRegularType() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + //input_cql=select * from cqltable where token(key) > ? and token(key) <= ? + cqlTableTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20cqltable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();"); + + //input_cql=select * from countertable where token(key) > ? and token(key) <= ? + counterTableTest("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20countertable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();"); + } + + private void cqlTableTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); Iterator<Tuple> it = pig.openIterator("rows"); //{key: int, //col_ascii: chararray, @@ -257,21 +274,45 @@ public class CqlTableDataTypeTest extends PigTestBase Assert.assertEquals(t.get(14), "varchar"); Assert.assertEquals(t.get(15), 123); } - - pig.registerQuery("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();"); - it = pig.openIterator("cc_rows"); + else + { + Assert.fail("Failed to get data for query " + initialQuery); + } + } + + private void counterTableTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); + Iterator<Tuple> it = pig.openIterator("cc_rows"); if (it.hasNext()) { Tuple t = it.next(); Assert.assertEquals(t.get(0), 1); Assert.assertEquals(t.get(1), 3L); } + else + { + Assert.fail("Failed to get data for query " + initialQuery); + } } @Test public void testCqlStorageSetType() throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException { - pig.registerQuery("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();"); + settableTest("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();"); + } + + @Test + public void testCqlNativeStorageSetType() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + //input_cql=select * from settable where token(key) > ? and token(key) <= ? + settableTest("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20settable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();"); + } + + private void settableTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); Iterator<Tuple> it = pig.openIterator("set_rows"); if (it.hasNext()) { Tuple t = it.next(); @@ -322,13 +363,30 @@ public class CqlTableDataTypeTest extends PigTestBase Assert.assertEquals(innerTuple.get(0), 123); Assert.assertEquals(innerTuple.get(1), 124); } + else + { + Assert.fail("Failed to get data for query " + initialQuery); + } } @Test public void testCqlStorageListType() throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException { - pig.registerQuery("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();"); + listtableTest("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();"); + } + + @Test + public void testCqlNativeStorageListType() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + //input_cql=select * from listtable where token(key) > ? and token(key) <= ? + listtableTest("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20listtable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();"); + } + + private void listtableTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); Iterator<Tuple> it = pig.openIterator("list_rows"); if (it.hasNext()) { Tuple t = it.next(); @@ -379,13 +437,30 @@ public class CqlTableDataTypeTest extends PigTestBase Assert.assertEquals(innerTuple.get(1), 123); Assert.assertEquals(innerTuple.get(0), 124); } + else + { + Assert.fail("Failed to get data for query " + initialQuery); + } } @Test public void testCqlStorageMapType() throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException { - pig.registerQuery("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();"); + maptableTest("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();"); + } + + @Test + public void testCqlNativeStorageMapType() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + //input_cql=select * from maptable where token(key) > ? and token(key) <= ? + maptableTest("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20maptable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' USING CqlStorage();"); + } + + private void maptableTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); Iterator<Tuple> it = pig.openIterator("map_rows"); if (it.hasNext()) { Tuple t = it.next(); @@ -436,5 +511,10 @@ public class CqlTableDataTypeTest extends PigTestBase Assert.assertEquals(innerTuple.get(0), 123); Assert.assertEquals(innerTuple.get(1), 124); } + else + { + Assert.fail("Failed to get data for query " + initialQuery); + } } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/CqlTableTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java index 15d49f2..f5adef8 100644 --- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java +++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java @@ -93,7 +93,41 @@ public class CqlTableTest extends PigTestBase public void testCqlStorageSchema() throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException { - pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();"); + cqlTableSchemaTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();"); + compactCqlTableSchemaTest("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + "' USING CqlStorage();"); + } + + @Test + public void testCqlNativeStorageSchema() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + //input_cql=select * from cqltable where token(key1) > ? and token(key1) <= ? + cqlTableSchemaTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20cqltable%20where%20token(key1)%20%3E%20%3F%20and%20token(key1)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + + //input_cql=select * from compactcqltable where token(key1) > ? and token(key1) <= ? + compactCqlTableSchemaTest("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compactcqltable%20where%20token(key1)%20%3E%20%3F%20and%20token(key1)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + } + + private void compactCqlTableSchemaTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); + Iterator<Tuple> it = pig.openIterator("rows"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0).toString(), "key1"); + Assert.assertEquals(t.get(1), 100); + Assert.assertEquals(t.get(2), 10.1f); + Assert.assertEquals(3, t.size()); + } + else + { + Assert.fail("Failed to get data for query " + initialQuery); + } + } + + private void cqlTableSchemaTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); Iterator<Tuple> it = pig.openIterator("rows"); if (it.hasNext()) { Tuple t = it.next(); @@ -103,15 +137,9 @@ public class CqlTableTest extends PigTestBase Assert.assertEquals(t.get(3), 10.1f); Assert.assertEquals(4, t.size()); } - - pig.registerQuery("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + "' USING CqlStorage();"); - it = pig.openIterator("rows"); - if (it.hasNext()) { - Tuple t = it.next(); - Assert.assertEquals(t.get(0).toString(), "key1"); - Assert.assertEquals(t.get(1), 100); - Assert.assertEquals(t.get(2), 10.1f); - Assert.assertEquals(3, t.size()); + else + { + Assert.fail("Failed to get data for query " + initialQuery); } } @@ -119,8 +147,23 @@ public class CqlTableTest extends PigTestBase public void testCqlStorageSingleKeyTable() throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException { + SingleKeyTableTest("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + "' USING CqlStorage();"); + + } + + @Test + public void testCqlNativeStorageSingleKeyTable() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + //input_cql=select * from moredata where token(x) > ? and token(x) <= ? + SingleKeyTableTest("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + } + + private void SingleKeyTableTest(String initialQuery) + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { pig.setBatchOn(); - pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + "' USING CqlStorage();"); + pig.registerQuery(initialQuery); pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);"); pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test?" + defaultParameters + "&output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING CqlStorage();"); pig.executeBatch(); @@ -132,7 +175,7 @@ public class CqlTableTest extends PigTestBase //(1,1) pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + defaultParameters + "' USING CqlStorage();"); Iterator<Tuple> it = pig.openIterator("result"); - if (it.hasNext()) { + while (it.hasNext()) { Tuple t = it.next(); Assert.assertEquals(t.get(0), t.get(1)); } @@ -142,8 +185,22 @@ public class CqlTableTest extends PigTestBase public void testCqlStorageCompositeKeyTable() throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException { + CompositeKeyTableTest("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + "' USING CqlStorage();"); + } + + @Test + public void testCqlNativeStorageCompositeKeyTable() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + //input_cql=select * from compmore where token(id) > ? and token(id) <= ? + CompositeKeyTableTest("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compmore%20where%20token(id)%20%3E%20%3F%20and%20token(id)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + } + + private void CompositeKeyTableTest(String initialQuery) + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { pig.setBatchOn(); - pig.registerQuery("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + "' USING CqlStorage();"); + pig.registerQuery(initialQuery); pig.registerQuery("insertformat = FOREACH moredata GENERATE TOTUPLE (TOTUPLE('a',x),TOTUPLE('b',y), TOTUPLE('c',z)),TOTUPLE(data);"); pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/compotable?" + defaultParameters + "&output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlStorage();"); pig.executeBatch(); @@ -171,8 +228,22 @@ public class CqlTableTest extends PigTestBase public void testCqlStorageCollectionColumnTable() throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException { + CollectionColumnTableTest("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();"); + } + + @Test + public void testCqlNativeStorageCollectionColumnTable() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + //input_cql=select * from collectiontable where token(m) < ? and token(m) <= ? + CollectionColumnTableTest("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + nativeParameters + "&input_cql=input_cql%3Dselect%20*%20from%20collectiontable%20where%20token(m)%20%3C%20%3F%20and%20token(m)%20%3C%3D%20%3F' USING CqlNativeStorage();"); + } + + private void CollectionColumnTableTest(String initialQuery) + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { pig.setBatchOn(); - pig.registerQuery("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();"); + pig.registerQuery(initialQuery); pig.registerQuery("recs= FOREACH collectiontable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', 'mm'), TOTUPLE('n', 'nn')));"); pig.registerQuery("STORE recs INTO 'cql://cql3ks/collectiontable?" + defaultParameters + "&output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' USING CqlStorage();"); pig.executeBatch(); @@ -183,7 +254,7 @@ public class CqlTableTest extends PigTestBase //(book1,((m,mm),(n,nn))) pig.registerQuery("result= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();"); Iterator<Tuple> it = pig.openIterator("result"); - while (it.hasNext()) { + if (it.hasNext()) { Tuple t = it.next(); Tuple t1 = (Tuple) t.get(1); Assert.assertEquals(t1.size(), 2); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/PigTestBase.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java b/test/pig/org/apache/cassandra/pig/PigTestBase.java index ea06b8c..ed307f4 100644 --- a/test/pig/org/apache/cassandra/pig/PigTestBase.java +++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java @@ -66,6 +66,9 @@ public class PigTestBase extends SchemaLoader protected static MiniCluster cluster; protected static PigServer pig; protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner"; + protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000" + + "&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" + + "&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9052"; @AfterClass public static void oneTimeTearDown() throws Exception { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java index e114d37..3f1d5a1 100644 --- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java +++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java @@ -183,6 +183,13 @@ public class ThriftColumnFamilyTest extends PigTestBase "and comparator = LongType;" }; + private static String[] deleteCopyOfSomeAppTableData = { "use thriftKs;", + "DEL CopyOfSomeApp ['foo']", + "DEL CopyOfSomeApp ['bar']", + "DEL CopyOfSomeApp ['baz']", + "DEL CopyOfSomeApp ['qux']" + }; + @BeforeClass public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException, AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException @@ -196,7 +203,35 @@ public class ThriftColumnFamilyTest extends PigTestBase public void testCqlStorage() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException { //regular thrift column families - pig.registerQuery("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + "' using CqlStorage();"); + cqlStorageTest("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + "' using CqlStorage();"); + + //Test counter colun family + // This test fails for CASSANDRA-7059 + //cqlStorageCounterTableTest("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + "' using CqlStorage();"); + + //Test composite column family + cqlStorageCompositeTableTest("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + "' using CqlStorage();"); + } + + @Test + public void testCqlNativeStorage() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException + { + //regular thrift column families + //input_cql=select * from "SomeApp" where token(key) > ? and token(key) <= ? + cqlStorageTest("data = load 'cql://thriftKs/SomeApp?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20%22SomeApp%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); + + //Test counter colun family + //input_cql=select * from "CC" where token(key) > ? and token(key) <= ? + cqlStorageCounterTableTest("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20%22CC%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); + + //Test composite column family + //input_cql=select * from "Compo" where token(key) > ? and token(key) <= ? + cqlStorageCompositeTableTest("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20%22Compo%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F' using CqlNativeStorage();"); + } + + private void cqlStorageTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); //(bar,3.141592653589793,1335890877,User Bar,35.0,9,15000,like) //(baz,1.61803399,1335890877,User Baz,95.3,3,512000,dislike) @@ -249,16 +284,18 @@ public class ThriftColumnFamilyTest extends PigTestBase } } Assert.assertEquals(count, 4); + } - //Test counter colun family - pig.registerQuery("cc_data = load 'cql://thriftKs/CC?" + defaultParameters + "' using CqlStorage();"); + private void cqlStorageCounterTableTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); //(chuck,fist,1) //(chuck,kick,3) // {key: chararray,column1: chararray,value: long} - it = pig.openIterator("cc_data"); - count = 0; + Iterator<Tuple> it = pig.openIterator("cc_data"); + int count = 0; while (it.hasNext()) { count ++; Tuple t = it.next(); @@ -268,9 +305,11 @@ public class ThriftColumnFamilyTest extends PigTestBase Assert.assertEquals(t.get(2), 3L); } Assert.assertEquals(count, 2); + } - //Test composite column family - pig.registerQuery("compo_data = load 'cql://thriftKs/Compo?" + defaultParameters + "' using CqlStorage();"); + private void cqlStorageCompositeTableTest(String initialQuery) throws IOException + { + pig.registerQuery(initialQuery); //(kick,bruce,bruce,watch it, mate) //(kick,bruce,lee,oww) @@ -278,8 +317,8 @@ public class ThriftColumnFamilyTest extends PigTestBase //(punch,bruce,lee,ouch) //{key: chararray,column1: chararray,column2: chararray,value: chararray} - it = pig.openIterator("compo_data"); - count = 0; + Iterator<Tuple> it = pig.openIterator("compo_data"); + int count = 0; while (it.hasNext()) { count ++; Tuple t = it.next(); @@ -351,7 +390,6 @@ public class ThriftColumnFamilyTest extends PigTestBase @Test public void testCassandraStorageFullCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException { - createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]); pig.setBatchOn(); pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();"); //full copy @@ -365,7 +403,7 @@ public class ThriftColumnFamilyTest extends PigTestBase @Test public void testCassandraStorageSigleTupleCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException { - createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]); + executeCliStatements(deleteCopyOfSomeAppTableData); pig.setBatchOn(); pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();"); //sigle tuple @@ -399,7 +437,7 @@ public class ThriftColumnFamilyTest extends PigTestBase @Test public void testCassandraStorageBagOnlyCopy() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException { - createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]); + executeCliStatements(deleteCopyOfSomeAppTableData); pig.setBatchOn(); pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();"); //bag only @@ -443,7 +481,7 @@ public class ThriftColumnFamilyTest extends PigTestBase @Test public void testCassandraStorageFilter() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException, AuthenticationException, AuthorizationException { - createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]); + executeCliStatements(deleteCopyOfSomeAppTableData); pig.setBatchOn(); pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();"); @@ -476,7 +514,7 @@ public class ThriftColumnFamilyTest extends PigTestBase if (value != null) Assert.fail(); - createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]); + executeCliStatements(deleteCopyOfSomeAppTableData); pig.setBatchOn(); pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();"); pig.registerQuery("dislikes_extras = FILTER rows by vote_type.value eq 'dislike' AND COUNT(columns) > 0;"); @@ -746,17 +784,16 @@ public class ThriftColumnFamilyTest extends PigTestBase return parseType(validator).getString(got.getColumn().value); } - private void createColumnFamily(String ks, String cf, String statement) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException + private void executeCliStatements(String[] statements) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException { CliMain.connect("127.0.0.1", 9170); try { - CliMain.processStatement("use " + ks + ";"); - CliMain.processStatement("drop column family " + cf + ";"); + for (String stmt : statements) + CliMain.processStatement(stmt); } catch (Exception e) { } - CliMain.processStatement(statement); } }