Hi Akhil, Thank you very much for your help and support.
Regards, Rajesh On Fri, Aug 1, 2014 at 7:57 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Here's a piece of code. In your case, you are missing the call() method > inside the map function. > > > import java.util.Iterator; > > import java.util.List; > > import org.apache.commons.configuration.Configuration; > > import org.apache.hadoop.hbase.HBaseConfiguration; > > import org.apache.hadoop.hbase.KeyValue; > > import org.apache.hadoop.hbase.client.Get; > > import org.apache.hadoop.hbase.client.HTable; > > import org.apache.hadoop.hbase.client.Result; > > import org.apache.hadoop.hbase.util.Bytes; > > import org.apache.spark.SparkConf; > > import org.apache.spark.SparkContext; > > import org.apache.spark.api.java.JavaRDD; > > import org.apache.spark.api.java.JavaSparkContext; > > import org.apache.spark.api.java.function.Function; > > import org.apache.spark.rdd.NewHadoopRDD; > > import org.apache.spark.streaming.Duration; > > import org.apache.spark.streaming.api.java.JavaStreamingContext; > > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > > import org.apache.hadoop.hbase.mapreduce.TableInputFormat; > > import com.google.common.collect.Lists; > > import scala.Function1; > > import scala.Tuple2; > > import scala.collection.JavaConversions; > > import scala.collection.Seq; > > import scala.collection.JavaConverters.*; > > import scala.reflect.ClassTag; > > public class SparkHBaseMain { > > @SuppressWarnings("deprecation") > > public static void main(String[] arg){ > > try{ > > List<String> jars = >> Lists.newArrayList("/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar", > > "/home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar", > > "/home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar", > > >> "/home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar", > > "/home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar", > > "/home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar", > > "/home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar"); > > SparkConf spconf = new SparkConf(); > > spconf.setMaster("local"); > > spconf.setAppName("SparkHBase"); > > spconf.setSparkHome("/home/akhld/Desktop/tools/spark-9"); > > spconf.setJars(jars.toArray(new String[jars.size()])); > > spconf.set("spark.executor.memory", "1g"); > > final JavaSparkContext sc = new JavaSparkContext(spconf); > > org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); > > conf.addResource("/home/akhld/Downloads/sparkhbasecode/hbase-site.xml"); > > conf.set(TableInputFormat.INPUT_TABLE, "blogposts"); > > NewHadoopRDD<ImmutableBytesWritable, Result> rdd = new >> NewHadoopRDD<ImmutableBytesWritable, >> Result>(JavaSparkContext.toSparkContext(sc), TableInputFormat.class, >> ImmutableBytesWritable.class, Result.class, conf); > > JavaRDD<Tuple2<ImmutableBytesWritable, Result>> jrdd = rdd.toJavaRDD(); > > *ForEachFunction f = new ForEachFunction();* > > * JavaRDD<Iterator<String>> retrdd = jrdd.map(f);* > > > >> System.out.println("Count =>" + retrdd.count()); > > }catch(Exception e){ > > e.printStackTrace(); > > System.out.println("Craaaashed : " + e); > > } > > } > > @SuppressWarnings("serial") > > private static class ForEachFunction extends >> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{ > > *public Iterator<String> call(Tuple2<ImmutableBytesWritable, >> Result> test) {* > > * Result tmp = (Result) test._2;* > > * List<KeyValue> kvl = tmp.getColumn("post".getBytes(), >> "title".getBytes());* > > * for(KeyValue kl:kvl){* > > * String sb = new String(kl.getValue());* > > * System.out.println("Value :" + sb);* > > * }* > > * return null;* > > * }* > > } > > >> } > > > Hope it helps. > > > Thanks > Best Regards > > > On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar < > mrajaf...@gmail.com> wrote: > >> Hi Akhil, >> >> Thank you for your response. I'm facing below issues. >> >> I'm not able to print the values. Am I missing any thing. Could you >> please look into this issue. >> >> JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = >> sc.newAPIHadoopRDD( >> conf, >> TableInputFormat.class, >> ImmutableBytesWritable.class, >> Result.class); >> >> System.out.println(" ROWS COUNT = "+ hBaseRDD.count()); >> >> JavaRDD R = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, >> Result>, Iterator<String>>(){ >> >> public Iterator<String> call(Tuple2<ImmutableBytesWritable, >> Result> test) >> { >> Result tmp = (Result) test._2; >> >> System.out.println("Inside "); >> >> // List<KeyValue> kvl = tmp.getColumn("post".getBytes(), >> "title".getBytes()); >> for(KeyValue kl:tmp.raw()) >> >> { >> String sb = new String(kl.getValue()); >> System.out.println(sb); >> } >> return null; >> } >> } >> ); >> >> *Output :* >> >> ROWS COUNT = 8 >> >> It is not printing "Inside" statement also. I think it is not going into >> this function. >> >> Could you please help me on this issue. >> >> Thank you for your support and help >> >> Regards, >> Rajesh >> >> >> >> On Fri, Aug 1, 2014 at 12:17 PM, Akhil Das <ak...@sigmoidanalytics.com> >> wrote: >> >>> You can use a map function like the following and do whatever you want >>> with the Result. >>> >>> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{ >>>> public Iterator<String> >>>> call(Tuple2<ImmutableBytesWritable, Result> test) { >>>> Result tmp = (Result) test._2; >>>> List<KeyValue> kvl = *tmp.getColumn("post".getBytes(), >>>> "title".getBytes());* >>>> for(KeyValue kl:kvl){ >>>> String sb = new String(kl.getValue()); >>>> System.out.println(sb); >>>> } >>> >>> >>> >>> >>> Thanks >>> Best Regards >>> >>> >>> On Thu, Jul 31, 2014 at 10:19 PM, Madabhattula Rajesh Kumar < >>> mrajaf...@gmail.com> wrote: >>> >>>> Hi Team, >>>> >>>> I'm using below code to read table from hbase >>>> >>>> Configuration conf = HBaseConfiguration.create(); >>>> conf.set(TableInputFormat.INPUT_TABLE, "table1"); >>>> >>>> JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD( >>>> conf, >>>> TableInputFormat.class, >>>> ImmutableBytesWritable.class, >>>> Result.class); >>>> >>>> I got hBaseRDD. I'm not able to read the column values from hBaseRDD. >>>> >>>> *Could you please let me know, how to read the column values from >>>> hBaseRDD?* >>>> >>>> Thank you for your help. >>>> >>>> Regards, >>>> Rajesh >>>> >>>> >>> >> >