import java.io.IOException; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.regex.Matcher;
import org.apache.cassandra.db.Column; import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import scala.Tuple3; public class CassandraSparkConnectionTest implements Serializable{ public static void main(String[] args) throws IOException { new CassandraSparkConnectionTest().process(); } @SuppressWarnings({ "unchecked", "serial" }) public void process() throws IOException { String host = "localhost"; String port = "9160"; JavaSparkContext sparkContext = new JavaSparkContext("local", "cassandraSparkConnectionTest", System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(CassandraSparkConnectivity.class)); Job job = new Job(); job.setInputFormatClass(ColumnFamilyInputFormat.class); ConfigHelper.setInputInitialAddress(job.getConfiguration(), host); ConfigHelper.setInputRpcPort(job.getConfiguration(), port); ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host); ConfigHelper.setOutputRpcPort(job.getConfiguration(), port); ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words"); //ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount"); ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); //ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false, 20); predicate.setSlice_range(sliceRange); ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); Map<ByteBuffer, Column> valueClass = new TreeMap<ByteBuffer, Column>(); JavaPairRDD<ByteBuffer, TreeMap<ByteBuffer, Column>> rdd = sparkContext .newAPIHadoopRDD(job.getConfiguration(), ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class), ByteBuffer.class, valueClass.getClass()); JavaPairRDD<ByteBuffer, Column> pair = rdd.map (new PairFunction <Tuple2 <ByteBuffer, TreeMap<ByteBuffer, Column>>, ByteBuffer, Column> () { @Override public Tuple2<ByteBuffer, Column> call( Tuple2<ByteBuffer, TreeMap<ByteBuffer, Column>> paramT) throws Exception { System.out.println(ByteBufferUtil.string(paramT._1())); Set<ByteBuffer> keys = paramT._2.keySet(); for(ByteBuffer key : keys) { System.out.println("\t" + ByteBufferUtil.string(key)); Column col = paramT._2().get(key); System.out.println("\t" + ByteBufferUtil.string(col.value())); } return null; //Add code } }); pair.collect(); System.out.println("Done."); } public static Tuple3<String, String, String> extractKey(String s) { Matcher m = null; List<String> key = Collections.emptyList(); if (m.find()) { String ip = m.group(1); String user = m.group(3); String query = m.group(5); if (!user.equalsIgnoreCase("-")) { return new Tuple3<String, String, String>(ip, user, query); } } return new Tuple3<String, String, String>(null, null, null); } public static ByteBuffer toByteBuffer(String value) throws UnsupportedEncodingException { if (value == null) { value = ""; } return ByteBuffer.wrap(value.getBytes("UTF-8")); } public static String toString(ByteBuffer buffer) throws UnsupportedEncodingException { byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); return new String(bytes, "UTF-8"); } } -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/JAVA-Cassanra-Test-example-tp4490.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.