import java.io.IOException;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;

import org.apache.cassandra.db.Column;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;
import scala.Tuple3;

public class CassandraSparkConnectionTest implements Serializable{
        
        public static void main(String[] args) throws IOException {
                new CassandraSparkConnectionTest().process();
        }

        @SuppressWarnings({ "unchecked", "serial" })
        public void process() throws IOException {
                String host = "localhost";
                String port = "9160";
                
                JavaSparkContext sparkContext = new JavaSparkContext("local",
"cassandraSparkConnectionTest", System.getenv("SPARK_HOME"),
                                
JavaSparkContext.jarOfClass(CassandraSparkConnectivity.class));
                Job job = new Job();
                job.setInputFormatClass(ColumnFamilyInputFormat.class);

                ConfigHelper.setInputInitialAddress(job.getConfiguration(), 
host);
                ConfigHelper.setInputRpcPort(job.getConfiguration(), port);
                ConfigHelper.setOutputInitialAddress(job.getConfiguration(), 
host);
                ConfigHelper.setOutputRpcPort(job.getConfiguration(), port);
                ConfigHelper.setInputColumnFamily(job.getConfiguration(), 
"casDemo",
"Words");
                //ConfigHelper.setOutputColumnFamily(job.getConfiguration(), 
"casDemo",
"WordCount");

                ConfigHelper.setInputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");
                //ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");

                SlicePredicate predicate = new SlicePredicate();
                SliceRange sliceRange = new SliceRange(toByteBuffer(""), 
toByteBuffer(""),
false, 20);
                predicate.setSlice_range(sliceRange);
                ConfigHelper.setInputSlicePredicate(job.getConfiguration(), 
predicate);

                Map<ByteBuffer, Column> valueClass = new TreeMap<ByteBuffer, 
Column>();

                JavaPairRDD<ByteBuffer, TreeMap&lt;ByteBuffer, Column>> rdd = 
sparkContext
                                .newAPIHadoopRDD(job.getConfiguration(),
                                        
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
                                                ByteBuffer.class, 
valueClass.getClass());
                
                JavaPairRDD<ByteBuffer, Column> pair =  rdd.map (new 
PairFunction <Tuple2
<ByteBuffer, TreeMap<ByteBuffer, Column>>, ByteBuffer, Column> () {
                        @Override
                        public Tuple2<ByteBuffer, Column> call(
                                        Tuple2<ByteBuffer, 
TreeMap&lt;ByteBuffer, Column>> paramT)
                                        throws Exception {
                                
System.out.println(ByteBufferUtil.string(paramT._1()));
                                Set<ByteBuffer> keys = paramT._2.keySet();
                                for(ByteBuffer key : keys) {
                                        System.out.println("\t" + 
ByteBufferUtil.string(key));
                                        Column col = paramT._2().get(key);
                                        System.out.println("\t" + 
ByteBufferUtil.string(col.value()));
                                }
                                return null; //Add code
                        }
        });
                
                pair.collect();
                
                System.out.println("Done.");
        }

        public static Tuple3<String, String, String> extractKey(String s) {
                Matcher m = null;
                List<String> key = Collections.emptyList();
                if (m.find()) {
                        String ip = m.group(1);
                        String user = m.group(3);
                        String query = m.group(5);
                        if (!user.equalsIgnoreCase("-")) {
                                return new Tuple3<String, String, String>(ip, 
user, query);
                        }
                }
                return new Tuple3<String, String, String>(null, null, null);
        }

        public static ByteBuffer toByteBuffer(String value)
                        throws UnsupportedEncodingException {
                if (value == null) {
                        value = "";
                }
                return ByteBuffer.wrap(value.getBytes("UTF-8"));
        }

        public static String toString(ByteBuffer buffer)
                        throws UnsupportedEncodingException {
                byte[] bytes = new byte[buffer.remaining()];
                buffer.get(bytes);
                return new String(bytes, "UTF-8");
        }

}




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/JAVA-Cassanra-Test-example-tp4490.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to