Repository: cassandra Updated Branches: refs/heads/trunk e90489fdf -> 48bf2b131
Revert "Add CqlRecordReader to take advantage of native CQL pagination" This reverts commit 3b708f9989274cbe9e0e2a5fda6f1d0a3d96ebee. Conflicts: CHANGES.txt Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0841d8f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0841d8f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0841d8f Branch: refs/heads/trunk Commit: b0841d8fee4c89647894f056b1106b9ac67be8b9 Parents: e6c8034 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Mar 25 16:49:10 2014 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Mar 25 16:49:10 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 - build.xml | 4 +- examples/hadoop_cql3_word_count/bin/word_count | 3 +- .../bin/word_count_counters | 4 +- .../hadoop_cql3_word_count/src/WordCount.java | 77 +-- .../src/WordCountCounters.java | 54 +- .../cassandra/hadoop/cql3/CqlConfigHelper.java | 541 +------------------ .../cassandra/hadoop/cql3/CqlInputFormat.java | 80 --- .../hadoop/cql3/CqlPagingRecordReader.java | 2 +- .../cassandra/hadoop/cql3/CqlRecordReader.java | 260 --------- 10 files changed, 30 insertions(+), 996 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ed202cc..226855c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -27,7 +27,6 @@ * Improve PerRowSecondaryIndex performance (CASSANDRA-6876) * Extend triggers to support CAS updates (CASSANDRA-6882) * Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873) - * Add CqlRecordReader to take advantage of native CQL pagination (CASSANDRA-6311) * Fix paging with SELECT DISTINCT (CASSANDRA-6857) Merged from 1.2: * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index a15415b..464dece 100644 --- a/build.xml +++ b/build.xml @@ -380,7 +380,6 @@ <dependency groupId="edu.stanford.ppl" artifactId="snaptree" version="0.1" /> <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" /> <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" /> - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" /> <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" /> </dependencyManagement> <developer id="alakshman" name="Avinash Lakshman"/> @@ -411,7 +410,7 @@ <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/> <dependency groupId="org.apache.pig" artifactId="pig"/> - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/> + <dependency groupId="net.java.dev.jna" artifactId="jna"/> </artifact:pom> @@ -474,7 +473,6 @@ <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/> <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/> - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/> <!-- don't need jna to run, but nice to have --> <dependency groupId="net.java.dev.jna" artifactId="jna" optional="true"/> http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/bin/word_count ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/bin/word_count b/examples/hadoop_cql3_word_count/bin/word_count index 974a39a..a0c5aa0 100644 --- a/examples/hadoop_cql3_word_count/bin/word_count +++ b/examples/hadoop_cql3_word_count/bin/word_count @@ -56,7 +56,6 @@ if [ "x$JAVA" = "x" ]; then fi OUTPUT_REDUCER=cassandra -INPUT_MAPPER=native #echo $CLASSPATH -$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER input_mapper=$INPUT_MAPPER +$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/bin/word_count_counters ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters index 0b69b40..7793477 100644 --- a/examples/hadoop_cql3_word_count/bin/word_count_counters +++ b/examples/hadoop_cql3_word_count/bin/word_count_counters @@ -54,7 +54,5 @@ if [ "x$JAVA" = "x" ]; then exit 1 fi -INPUT_MAPPER=native - #echo $CLASSPATH -$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters input_mapper=$INPUT_MAPPER +$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/src/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java index 519a98f..bc81a53 100644 --- a/examples/hadoop_cql3_word_count/src/WordCount.java +++ b/examples/hadoop_cql3_word_count/src/WordCount.java @@ -27,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat; -import org.apache.cassandra.hadoop.cql3.CqlInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.conf.Configuration; @@ -38,11 +37,10 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.datastax.driver.core.Row; + import java.nio.charset.CharacterCodingException; /** @@ -62,7 +60,7 @@ import java.nio.charset.CharacterCodingException; public class WordCount extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(WordCount.class); - static final String INPUT_MAPPER_VAR = "input_mapper"; + static final String KEYSPACE = "cql3_worldcount"; static final String COLUMN_FAMILY = "inputs"; @@ -70,6 +68,7 @@ public class WordCount extends Configured implements Tool static final String OUTPUT_COLUMN_FAMILY = "output_words"; private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count"; + private static final String PRIMARY_KEY = "row_key"; public static void main(String[] args) throws Exception @@ -109,30 +108,6 @@ public class WordCount extends Configured implements Tool } } - public static class NativeTokenizerMapper extends Mapper<Long, Row, Text, IntWritable> - { - private final static IntWritable one = new IntWritable(1); - private Text word = new Text(); - private ByteBuffer sourceColumn; - - protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) - throws IOException, InterruptedException - { - } - - public void map(Long key, Row row, Context context) throws IOException, InterruptedException - { - String value = row.getString("line"); - logger.debug("read {}:{}={} from {}", new Object[] {key, "line", value, context.getInputSplit()}); - StringTokenizer itr = new StringTokenizer(value); - while (itr.hasMoreTokens()) - { - word.set(itr.nextToken()); - context.write(word, one); - } - } - } - public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException @@ -174,41 +149,17 @@ public class WordCount extends Configured implements Tool public int run(String[] args) throws Exception { String outputReducerType = "filesystem"; - String inputMapperType = "native"; - String outputReducer = null; - String inputMapper = null; - - if (args != null) + if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR)) { - if(args[0].startsWith(OUTPUT_REDUCER_VAR)) - outputReducer = args[0]; - if(args[0].startsWith(INPUT_MAPPER_VAR)) - inputMapper = args[0]; - - if (args.length == 2) - { - if(args[1].startsWith(OUTPUT_REDUCER_VAR)) - outputReducer = args[1]; - if(args[1].startsWith(INPUT_MAPPER_VAR)) - inputMapper = args[1]; - } - } - - if (outputReducer != null) - { - String[] s = outputReducer.split("="); + String[] s = args[0].split("="); if (s != null && s.length == 2) outputReducerType = s[1]; } logger.info("output reducer type: " + outputReducerType); - if (inputMapper != null) - { - String[] s = inputMapper.split("="); - if (s != null && s.length == 2) - inputMapperType = s[1]; - } + Job job = new Job(getConf(), "wordcount"); job.setJarByClass(WordCount.class); + job.setMapperClass(TokenizerMapper.class); if (outputReducerType.equalsIgnoreCase("filesystem")) { @@ -238,19 +189,9 @@ public class WordCount extends Configured implements Tool ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); } - if (inputMapperType.equalsIgnoreCase("native")) - { - job.setMapperClass(NativeTokenizerMapper.class); - job.setInputFormatClass(CqlInputFormat.class); - CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + COLUMN_FAMILY + " where token(id) > ? and token(id) <= ? allow filtering"); - } - else - { - job.setMapperClass(TokenizerMapper.class); - job.setInputFormatClass(CqlPagingInputFormat.class); - ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); - } + job.setInputFormatClass(CqlPagingInputFormat.class); + ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/examples/hadoop_cql3_word_count/src/WordCountCounters.java ---------------------------------------------------------------------- diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java index 74de9ab..542a473 100644 --- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java +++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java @@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat; -import org.apache.cassandra.hadoop.cql3.CqlInputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -38,7 +37,7 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.datastax.driver.core.Row; + import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.utils.ByteBufferUtil; @@ -52,7 +51,6 @@ public class WordCountCounters extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class); - static final String INPUT_MAPPER_VAR = "input_mapper"; static final String COUNTER_COLUMN_FAMILY = "input_words_count"; private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count_counters"; @@ -63,24 +61,6 @@ public class WordCountCounters extends Configured implements Tool System.exit(0); } - public static class SumNativeMapper extends Mapper<Long, Row, Text, LongWritable> - { - long sum = -1; - public void map(Long key, Row row, Context context) throws IOException, InterruptedException - { - if (sum < 0) - sum = 0; - - logger.debug("read " + key + ":count_num from " + context.getInputSplit()); - sum += Long.valueOf(row.getString("count_num")); - } - - protected void cleanup(Context context) throws IOException, InterruptedException { - if (sum > 0) - context.write(new Text("total_count"), new LongWritable(sum)); - } - } - public static class SumMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, LongWritable> { long sum = -1; @@ -115,6 +95,7 @@ public class WordCountCounters extends Configured implements Tool } } + public static class ReducerToFilesystem extends Reducer<Text, LongWritable, Text, LongWritable> { long sum = 0; @@ -129,40 +110,25 @@ public class WordCountCounters extends Configured implements Tool public int run(String[] args) throws Exception { - String inputMapperType = "native"; - if (args != null && args[0].startsWith(INPUT_MAPPER_VAR)) - { - String[] arg0 = args[0].split("="); - if (arg0 != null && arg0.length == 2) - inputMapperType = arg0[1]; - } Job job = new Job(getConf(), "wordcountcounters"); + job.setJarByClass(WordCountCounters.class); + job.setMapperClass(SumMapper.class); job.setCombinerClass(ReducerToFilesystem.class); job.setReducerClass(ReducerToFilesystem.class); - job.setJarByClass(WordCountCounters.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX)); + + job.setInputFormatClass(CqlPagingInputFormat.class); + ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost"); ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY); CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3"); - if ("native".equals(inputMapperType)) - { - job.setMapperClass(SumNativeMapper.class); - job.setInputFormatClass(CqlInputFormat.class); - CqlConfigHelper.setInputCql(job.getConfiguration(), "select * from " + WordCount.OUTPUT_COLUMN_FAMILY + " where token(word) > ? and token(word) <= ? allow filtering"); - } - else - { - job.setMapperClass(SumMapper.class); - job.setInputFormatClass(CqlPagingInputFormat.class); - ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); - } - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX)); job.waitForCompletion(true); return 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index 3672c84..cb61d05 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -19,69 +19,13 @@ package org.apache.cassandra.hadoop.cql3; * under the License. * */ -import java.io.FileInputStream; -import java.io.IOException; -import java.security.KeyManagementException; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.Set; - -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; - -import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.hadoop.conf.Configuration; -import com.datastax.driver.core.AuthProvider; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Host; -import com.datastax.driver.core.HostDistance; -import com.datastax.driver.core.PoolingOptions; -import com.datastax.driver.core.ProtocolOptions; -import com.datastax.driver.core.QueryOptions; -import com.datastax.driver.core.SSLOptions; -import com.datastax.driver.core.SocketOptions; -import com.datastax.driver.core.Statement; -import com.datastax.driver.core.policies.LoadBalancingPolicy; -import com.google.common.base.Optional; -import com.google.common.collect.Iterators; -import com.google.common.collect.Sets; - public class CqlConfigHelper { private static final String INPUT_CQL_COLUMNS_CONFIG = "cassandra.input.columnfamily.columns"; // separate by colon , private static final String INPUT_CQL_PAGE_ROW_SIZE_CONFIG = "cassandra.input.page.row.size"; private static final String INPUT_CQL_WHERE_CLAUSE_CONFIG = "cassandra.input.where.clause"; - private static final String INPUT_CQL = "cassandra.input.cql"; - - private static final String INPUT_NATIVE_PORT = "cassandra.input.native.port"; - private static final String INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST = "cassandra.input.native.core.connections.per.host"; - private static final String INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST = "cassandra.input.native.max.connections.per.host"; - private static final String INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.min.simult.reqs.per.connection"; - private static final String INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION = "cassandra.input.native.max.simult.reqs.per.connection"; - private static final String INPUT_NATIVE_CONNECTION_TIMEOUT = "cassandra.input.native.connection.timeout"; - private static final String INPUT_NATIVE_READ_CONNECTION_TIMEOUT = "cassandra.input.native.read.connection.timeout"; - private static final String INPUT_NATIVE_RECEIVE_BUFFER_SIZE = "cassandra.input.native.receive.buffer.size"; - private static final String INPUT_NATIVE_SEND_BUFFER_SIZE = "cassandra.input.native.send.buffer.size"; - private static final String INPUT_NATIVE_SOLINGER = "cassandra.input.native.solinger"; - private static final String INPUT_NATIVE_TCP_NODELAY = "cassandra.input.native.tcp.nodelay"; - private static final String INPUT_NATIVE_REUSE_ADDRESS = "cassandra.input.native.reuse.address"; - private static final String INPUT_NATIVE_KEEP_ALIVE = "cassandra.input.native.keep.alive"; - private static final String INPUT_NATIVE_AUTH_PROVIDER = "cassandra.input.native.auth.provider"; - private static final String INPUT_NATIVE_SSL_TRUST_STORE_PATH = "cassandra.input.native.ssl.trust.store.path"; - private static final String INPUT_NATIVE_SSL_KEY_STORE_PATH = "cassandra.input.native.ssl.key.store.path"; - private static final String INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD = "cassandra.input.native.ssl.trust.store.password"; - private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password"; - private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites"; - private static final String OUTPUT_CQL = "cassandra.output.cql"; /** @@ -141,496 +85,25 @@ public class CqlConfigHelper conf.set(OUTPUT_CQL, cql); } - - public static void setInputCql(Configuration conf, String cql) - { - if (cql == null || cql.isEmpty()) - return; - - conf.set(INPUT_CQL, cql); - } - - public static Optional<Integer> getInputCoreConnections(Configuration conf) - { - return getIntSetting(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, conf); - } - - public static Optional<Integer> getInputMaxConnections(Configuration conf) - { - return getIntSetting(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, conf); - } - - public static int getInputNativePort(Configuration conf) - { - return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042")); - } - - public static Optional<Integer> getInputMinSimultReqPerConnections(Configuration conf) - { - return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf); - } - - public static Optional<Integer> getInputMaxSimultReqPerConnections(Configuration conf) - { - return getIntSetting(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, conf); - } - - public static Optional<Integer> getInputNativeConnectionTimeout(Configuration conf) - { - return getIntSetting(INPUT_NATIVE_CONNECTION_TIMEOUT, conf); - } - - public static Optional<Integer> getInputNativeReadConnectionTimeout(Configuration conf) - { - return getIntSetting(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, conf); - } - - public static Optional<Integer> getInputNativeReceiveBufferSize(Configuration conf) - { - return getIntSetting(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, conf); - } - - public static Optional<Integer> getInputNativeSendBufferSize(Configuration conf) - { - return getIntSetting(INPUT_NATIVE_SEND_BUFFER_SIZE, conf); - } - - public static Optional<Integer> getInputNativeSolinger(Configuration conf) - { - return getIntSetting(INPUT_NATIVE_SOLINGER, conf); - } - - public static Optional<Boolean> getInputNativeTcpNodelay(Configuration conf) - { - return getBooleanSetting(INPUT_NATIVE_TCP_NODELAY, conf); - } - - public static Optional<Boolean> getInputNativeReuseAddress(Configuration conf) - { - return getBooleanSetting(INPUT_NATIVE_REUSE_ADDRESS, conf); - } - - public static Optional<String> getInputNativeAuthProvider(Configuration conf) - { - return getStringSetting(INPUT_NATIVE_AUTH_PROVIDER, conf); - } - - public static Optional<String> getInputNativeSSLTruststorePath(Configuration conf) - { - return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PATH, conf); - } - - public static Optional<String> getInputNativeSSLKeystorePath(Configuration conf) - { - return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PATH, conf); - } - - public static Optional<String> getInputNativeSSLKeystorePassword(Configuration conf) - { - return getStringSetting(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, conf); - } - - public static Optional<String> getInputNativeSSLTruststorePassword(Configuration conf) - { - return getStringSetting(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, conf); - } - - public static Optional<String> getInputNativeSSLCipherSuites(Configuration conf) - { - return getStringSetting(INPUT_NATIVE_SSL_CIPHER_SUITES, conf); - } - - public static Optional<Boolean> getInputNativeKeepAlive(Configuration conf) - { - return getBooleanSetting(INPUT_NATIVE_KEEP_ALIVE, conf); - } - + + public static String getInputcolumns(Configuration conf) { return conf.get(INPUT_CQL_COLUMNS_CONFIG); } - - public static Optional<Integer> getInputPageRowSize(Configuration conf) + + public static String getInputPageRowSize(Configuration conf) { - return getIntSetting(INPUT_CQL_PAGE_ROW_SIZE_CONFIG, conf); + return conf.get(INPUT_CQL_PAGE_ROW_SIZE_CONFIG); } - + public static String getInputWhereClauses(Configuration conf) { return conf.get(INPUT_CQL_WHERE_CLAUSE_CONFIG); } - - public static String getInputCql(Configuration conf) - { - return conf.get(INPUT_CQL); - } - + public static String getOutputCql(Configuration conf) { return conf.get(OUTPUT_CQL); } - - public static Cluster getInputCluster(String host, Configuration conf) - { - int port = getInputNativePort(conf); - Optional<AuthProvider> authProvider = getAuthProvider(conf); - Optional<SSLOptions> sslOptions = getSSLOptions(conf); - LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host); - SocketOptions socketOptions = getReadSocketOptions(conf); - QueryOptions queryOptions = getReadQueryOptions(conf); - PoolingOptions poolingOptions = getReadPoolingOptions(conf); - - Cluster.Builder builder = Cluster.builder() - .addContactPoint(host) - .withPort(port) - .withCompression(ProtocolOptions.Compression.NONE); - - if (authProvider.isPresent()) - builder.withAuthProvider(authProvider.get()); - if (sslOptions.isPresent()) - builder.withSSL(sslOptions.get()); - - builder.withLoadBalancingPolicy(loadBalancingPolicy) - .withSocketOptions(socketOptions) - .withQueryOptions(queryOptions) - .withPoolingOptions(poolingOptions); - - return builder.build(); - } - - public static void setInputCoreConnections(Configuration conf, String connections) - { - conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections); - } - - public static void setInputMaxConnections(Configuration conf, String connections) - { - conf.set(INPUT_NATIVE_MAX_CONNECTIONS_PER_HOST, connections); - } - - public static void setInputMinSimultReqPerConnections(Configuration conf, String reqs) - { - conf.set(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, reqs); - } - - public static void setInputMaxSimultReqPerConnections(Configuration conf, String reqs) - { - conf.set(INPUT_NATIVE_MAX_SIMULT_REQ_PER_CONNECTION, reqs); - } - - public static void setInputNativeConnectionTimeout(Configuration conf, String timeout) - { - conf.set(INPUT_NATIVE_CONNECTION_TIMEOUT, timeout); - } - - public static void setInputNativeReadConnectionTimeout(Configuration conf, String timeout) - { - conf.set(INPUT_NATIVE_READ_CONNECTION_TIMEOUT, timeout); - } - - public static void setInputNativeReceiveBufferSize(Configuration conf, String size) - { - conf.set(INPUT_NATIVE_RECEIVE_BUFFER_SIZE, size); - } - - public static void setInputNativeSendBufferSize(Configuration conf, String size) - { - conf.set(INPUT_NATIVE_SEND_BUFFER_SIZE, size); - } - - public static void setInputNativeSolinger(Configuration conf, String solinger) - { - conf.set(INPUT_NATIVE_SOLINGER, solinger); - } - - public static void setInputNativeTcpNodelay(Configuration conf, String tcpNodelay) - { - conf.set(INPUT_NATIVE_TCP_NODELAY, tcpNodelay); - } - - public static void setInputNativeAuthProvider(Configuration conf, String authProvider) - { - conf.set(INPUT_NATIVE_AUTH_PROVIDER, authProvider); - } - - public static void setInputNativeSSLTruststorePath(Configuration conf, String authProvider) - { - conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PATH, authProvider); - } - - public static void setInputNativeSSLKeystorePath(Configuration conf, String authProvider) - { - conf.set(INPUT_NATIVE_SSL_KEY_STORE_PATH, authProvider); - } - - public static void setInputNativeSSLKeystorePassword(Configuration conf, String authProvider) - { - conf.set(INPUT_NATIVE_SSL_KEY_STORE_PASSWARD, authProvider); - } - - public static void setInputNativeSSLTruststorePassword(Configuration conf, String authProvider) - { - conf.set(INPUT_NATIVE_SSL_TRUST_STORE_PASSWARD, authProvider); - } - - public static void setInputNativeSSLCipherSuites(Configuration conf, String authProvider) - { - conf.set(INPUT_NATIVE_SSL_CIPHER_SUITES, authProvider); - } - - public static void setInputNativeReuseAddress(Configuration conf, String reuseAddress) - { - conf.set(INPUT_NATIVE_REUSE_ADDRESS, reuseAddress); - } - - public static void setInputNativeKeepAlive(Configuration conf, String keepAlive) - { - conf.set(INPUT_NATIVE_KEEP_ALIVE, keepAlive); - } - - public static void setInputNativePort(Configuration conf, String port) - { - conf.set(INPUT_NATIVE_PORT, port); - } - - private static PoolingOptions getReadPoolingOptions(Configuration conf) - { - Optional<Integer> coreConnections = getInputCoreConnections(conf); - Optional<Integer> maxConnections = getInputMaxConnections(conf); - Optional<Integer> maxSimultaneousRequests = getInputMaxSimultReqPerConnections(conf); - Optional<Integer> minSimultaneousRequests = getInputMinSimultReqPerConnections(conf); - - PoolingOptions poolingOptions = new PoolingOptions(); - - if (coreConnections.isPresent()) - poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, coreConnections.get()); - if (maxConnections.isPresent()) - poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, maxConnections.get()); - if (maxSimultaneousRequests.isPresent()) - poolingOptions.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, maxSimultaneousRequests.get()); - if (minSimultaneousRequests.isPresent()) - poolingOptions.setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.LOCAL, minSimultaneousRequests.get()); - - poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, 0) - .setMaxConnectionsPerHost(HostDistance.REMOTE, 0) - .setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0) - .setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 0); - - return poolingOptions; - } - - private static QueryOptions getReadQueryOptions(Configuration conf) - { - String CL = ConfigHelper.getReadConsistencyLevel(conf); - Optional<Integer> fetchSize = getInputPageRowSize(conf); - QueryOptions queryOptions = new QueryOptions(); - if (CL != null && !CL.isEmpty()) - queryOptions.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.valueOf(CL)); - - if (fetchSize.isPresent()) - queryOptions.setFetchSize(fetchSize.get()); - return queryOptions; - } - - private static SocketOptions getReadSocketOptions(Configuration conf) - { - SocketOptions socketOptions = new SocketOptions(); - Optional<Integer> connectTimeoutMillis = getInputNativeConnectionTimeout(conf); - Optional<Integer> readTimeoutMillis = getInputNativeReadConnectionTimeout(conf); - Optional<Integer> receiveBufferSize = getInputNativeReceiveBufferSize(conf); - Optional<Integer> sendBufferSize = getInputNativeSendBufferSize(conf); - Optional<Integer> soLinger = getInputNativeSolinger(conf); - Optional<Boolean> tcpNoDelay = getInputNativeTcpNodelay(conf); - Optional<Boolean> reuseAddress = getInputNativeReuseAddress(conf); - Optional<Boolean> keepAlive = getInputNativeKeepAlive(conf); - - if (connectTimeoutMillis.isPresent()) - socketOptions.setConnectTimeoutMillis(connectTimeoutMillis.get()); - if (readTimeoutMillis.isPresent()) - socketOptions.setReadTimeoutMillis(readTimeoutMillis.get()); - if (receiveBufferSize.isPresent()) - socketOptions.setReceiveBufferSize(receiveBufferSize.get()); - if (sendBufferSize.isPresent()) - socketOptions.setSendBufferSize(sendBufferSize.get()); - if (soLinger.isPresent()) - socketOptions.setSoLinger(soLinger.get()); - if (tcpNoDelay.isPresent()) - socketOptions.setTcpNoDelay(tcpNoDelay.get()); - if (reuseAddress.isPresent()) - socketOptions.setReuseAddress(reuseAddress.get()); - if (keepAlive.isPresent()) - socketOptions.setKeepAlive(keepAlive.get()); - - return socketOptions; - } - - private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final String stickHost) - { - return new LoadBalancingPolicy() - { - private Host origHost; - private Set<Host> liveRemoteHosts = Sets.newHashSet(); - - @Override - public void onAdd(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - origHost = host; - } - - @Override - public void onDown(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - origHost = null; - liveRemoteHosts.remove(host); - } - - @Override - public void onRemove(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - origHost = null; - liveRemoteHosts.remove(host); - } - - @Override - public void onUp(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - origHost = host; - liveRemoteHosts.add(host); - } - - @Override - public HostDistance distance(Host host) - { - if (host.getAddress().getHostName().equals(stickHost)) - return HostDistance.LOCAL; - else - return HostDistance.REMOTE; - } - - @Override - public void init(Cluster cluster, Collection<Host> hosts) - { - for (Host host : hosts) - { - if (host.getAddress().getHostName().equals(stickHost)) - { - origHost = host; - break; - } - } - } - - @Override - public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) - { - if (origHost != null) - { - return Iterators.concat(Collections.singletonList(origHost).iterator(), liveRemoteHosts.iterator()); - } - else - { - return liveRemoteHosts.iterator(); - } - } - }; - } - - private static Optional<AuthProvider> getAuthProvider(Configuration conf) - { - Optional<String> authProvider = getInputNativeAuthProvider(conf); - if (!authProvider.isPresent()) - return Optional.absent(); - - return Optional.of(getClientAuthProvider(authProvider.get())); - } - - private static Optional<SSLOptions> getSSLOptions(Configuration conf) - { - Optional<String> truststorePath = getInputNativeSSLTruststorePath(conf); - Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf); - Optional<String> truststorePassword = getInputNativeSSLTruststorePassword(conf); - Optional<String> keystorePassword = getInputNativeSSLKeystorePassword(conf); - Optional<String> cipherSuites = getInputNativeSSLCipherSuites(conf); - - if (truststorePath.isPresent() && keystorePath.isPresent() && truststorePassword.isPresent() && keystorePassword.isPresent()) - { - SSLContext context; - try - { - context = getSSLContext(truststorePath.get(), truststorePassword.get(), keystorePath.get(), keystorePassword.get()); - } - catch (UnrecoverableKeyException | KeyManagementException | - NoSuchAlgorithmException | KeyStoreException | CertificateException | IOException e) - { - throw new RuntimeException(e); - } - String[] css = SSLOptions.DEFAULT_SSL_CIPHER_SUITES; - if (cipherSuites.isPresent()) - css = cipherSuites.get().split(","); - return Optional.of(new SSLOptions(context,css)); - } - return Optional.absent(); - } - - private static Optional<Integer> getIntSetting(String parameter, Configuration conf) - { - String setting = conf.get(parameter); - if (setting == null) - return Optional.absent(); - return Optional.of(Integer.parseInt(setting)); - } - - private static Optional<Boolean> getBooleanSetting(String parameter, Configuration conf) - { - String setting = conf.get(parameter); - if (setting == null) - return Optional.absent(); - return Optional.of(Boolean.parseBoolean(setting)); - } - - private static Optional<String> getStringSetting(String parameter, Configuration conf) - { - String setting = conf.get(parameter); - if (setting == null) - return Optional.absent(); - return Optional.of(setting); - } - - private static AuthProvider getClientAuthProvider(String factoryClassName) - { - try - { - return (AuthProvider) Class.forName(factoryClassName).newInstance(); - } - catch (Exception e) - { - throw new RuntimeException("Failed to instantiate auth provider:" + factoryClassName, e); - } - } - - private static SSLContext getSSLContext(String truststorePath, String truststorePassword, String keystorePath, String keystorePassword) - throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException, UnrecoverableKeyException, KeyManagementException - { - - FileInputStream tsf = new FileInputStream(truststorePath); - FileInputStream ksf = new FileInputStream(keystorePath); - SSLContext ctx = SSLContext.getInstance("SSL"); - - KeyStore ts = KeyStore.getInstance("JKS"); - ts.load(tsf, truststorePassword.toCharArray()); - TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - tmf.init(ts); - - KeyStore ks = KeyStore.getInstance("JKS"); - ks.load(ksf, keystorePassword.toCharArray()); - KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - kmf.init(ks, keystorePassword.toCharArray()); - - ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); - return ctx; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java deleted file mode 100644 index e1cdf32..0000000 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop.cql3; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; - -import com.datastax.driver.core.Row; - -/** - * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. - * - * At minimum, you need to set the KS and CF in your Hadoop job Configuration. - * The ConfigHelper class is provided to make this - * simple: - * ConfigHelper.setInputColumnFamily - * - * You can also configure the number of rows per InputSplit with - * ConfigHelper.setInputSplitSize. The default split size is 64k rows. - * - * the number of CQL rows per page - * CQLConfigHelper.setInputCQLPageRowSize. The default page row size is 1000. You - * should set it to "as big as possible, but no bigger." It set the LIMIT for the CQL - * query, so you need set it big enough to minimize the network overhead, and also - * not too big to avoid out of memory issue. - * - * other native protocol connection parameters in CqlConfigHelper - */ -public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row> -{ - public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter) - throws IOException - { - TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID))) - { - @Override - public void progress() - { - reporter.progress(); - } - }; - - CqlRecordReader recordReader = new CqlRecordReader(); - recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac); - return recordReader; - } - - @Override - public org.apache.hadoop.mapreduce.RecordReader<Long, Row> createRecordReader( - org.apache.hadoop.mapreduce.InputSplit arg0, TaskAttemptContext arg1) throws IOException, - InterruptedException - { - return new CqlRecordReader(); - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java index b692280..cee4b4b 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlPagingRecordReader.java @@ -117,7 +117,7 @@ public class CqlPagingRecordReader extends RecordReader<Map<String, ByteBuffer>, try { - pageRowSize = CqlConfigHelper.getInputPageRowSize(conf).get(); + pageRowSize = Integer.parseInt(CqlConfigHelper.getInputPageRowSize(conf)); } catch (NumberFormatException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0841d8f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java deleted file mode 100644 index a19cf70..0000000 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop.cql3; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Maps; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.hadoop.ColumnFamilySplit; -import org.apache.cassandra.hadoop.ConfigHelper; -import org.apache.cassandra.utils.Pair; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ColumnMetadata; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -/** - * CqlRecordReader reads the rows return from the CQL query - * It uses CQL auto-paging. - * <p/> - * Return a Long as a local CQL row key starts from 0; - * <p/> - * Row as C* java driver CQL result set row - * 1) select clause must include partition key columns (to calculate the progress based on the actual CF row processed) - * 2) where clause must include token(partition_key1, ... , partition_keyn) > ? and - * token(partition_key1, ... , partition_keyn) <= ? (in the right order) - */ -public class CqlRecordReader extends RecordReader<Long, Row> - implements org.apache.hadoop.mapred.RecordReader<Long, Row> -{ - private static final Logger logger = LoggerFactory.getLogger(CqlRecordReader.class); - - private ColumnFamilySplit split; - private RowIterator rowIterator; - - private Pair<Long, Row> currentRow; - private int totalRowCount; // total number of rows to fetch - private String keyspace; - private String cfName; - private String cqlQuery; - private Cluster cluster; - private Session session; - private IPartitioner partitioner; - - // partition keys -- key aliases - private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap(); - - public CqlRecordReader() - { - super(); - } - - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException - { - this.split = (ColumnFamilySplit) split; - Configuration conf = context.getConfiguration(); - totalRowCount = (this.split.getLength() < Long.MAX_VALUE) - ? (int) this.split.getLength() - : ConfigHelper.getInputSplitSize(conf); - cfName = ConfigHelper.getInputColumnFamily(conf); - keyspace = ConfigHelper.getInputKeyspace(conf); - cqlQuery = CqlConfigHelper.getInputCql(conf); - partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration()); - try - { - if (cluster != null) - return; - - // create connection using thrift - String[] locations = split.getLocations(); - Exception lastException = null; - for (String location : locations) - { - try - { - cluster = CqlConfigHelper.getInputCluster(location, conf); - break; - } - catch (Exception e) - { - lastException = e; - logger.warn("Failed to create authenticated client to {}", location); - } - } - if (cluster == null && lastException != null) - throw lastException; - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - session = cluster.connect(keyspace); - rowIterator = new RowIterator(); - logger.debug("created {}", rowIterator); - } - - public void close() - { - if (session != null) - session.close(); - } - - public Long getCurrentKey() - { - return currentRow.left; - } - - public Row getCurrentValue() - { - return currentRow.right; - } - - public float getProgress() - { - if (!rowIterator.hasNext()) - return 1.0F; - - // the progress is likely to be reported slightly off the actual but close enough - float progress = ((float) rowIterator.totalRead / totalRowCount); - return progress > 1.0F ? 1.0F : progress; - } - - public boolean nextKeyValue() throws IOException - { - if (!rowIterator.hasNext()) - { - logger.debug("Finished scanning {} rows (estimate was: {})", rowIterator.totalRead, totalRowCount); - return false; - } - - try - { - currentRow = rowIterator.next(); - } - catch (Exception e) - { - // throw it as IOException, so client can catch it and handle it at client side - IOException ioe = new IOException(e.getMessage()); - ioe.initCause(ioe.getCause()); - throw ioe; - } - return true; - } - - // Because the old Hadoop API wants us to write to the key and value - // and the new asks for them, we need to copy the output of the new API - // to the old. Thus, expect a small performance hit. - // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat - // and ColumnFamilyRecordReader don't support them, it should be fine for now. - public boolean next(Long key, Row value) throws IOException - { - if (nextKeyValue()) - { - key = getCurrentKey(); - value = getCurrentValue(); - return true; - } - return false; - } - - public long getPos() throws IOException - { - return (long) rowIterator.totalRead; - } - - public Long createKey() - { - return null; - } - - public Row createValue() - { - return null; - } - - /** CQL row iterator - * Input cql query - * 1) select clause must include key columns (if we use partition key based row count) - * 2) where clause must include token(partition_key1 ... partition_keyn) > ? and - * token(partition_key1 ... partition_keyn) <= ? - */ - private class RowIterator extends AbstractIterator<Pair<Long, Row>> - { - private long keyId = 0L; - protected int totalRead = 0; // total number of cf rows read - protected Iterator<Row> rows; - private Map<String, ByteBuffer> previousRowKey = new HashMap<String, ByteBuffer>(); // previous CF row key - - public RowIterator() - { - AbstractType type = partitioner.getTokenValidator(); - ResultSet rs = session.execute(cqlQuery, type.compose(type.fromString(split.getStartToken())), type.compose(type.fromString(split.getEndToken())) ); - for (ColumnMetadata meta : cluster.getMetadata().getKeyspace(keyspace).getTable(cfName).getPartitionKey()) - partitionBoundColumns.put(meta.getName(), Boolean.TRUE); - rows = rs.iterator(); - } - - protected Pair<Long, Row> computeNext() - { - if (rows == null || !rows.hasNext()) - return endOfData(); - - Row row = rows.next(); - Map<String, ByteBuffer> keyColumns = new HashMap<String, ByteBuffer>(); - for (String column : partitionBoundColumns.keySet()) - keyColumns.put(column, row.getBytesUnsafe(column)); - - // increase total CF row read - if (previousRowKey.isEmpty() && !keyColumns.isEmpty()) - { - previousRowKey = keyColumns; - totalRead++; - } - else - { - for (String column : partitionBoundColumns.keySet()) - { - if (BytesType.bytesCompare(keyColumns.get(column), previousRowKey.get(column)) != 0) - { - previousRowKey = keyColumns; - totalRead++; - break; - } - } - } - keyId ++; - return Pair.create(keyId, row); - } - } -}