Here's a working version that we have.
> DStream<Tuple2<Text, Tuple>> hadoopDStream =
> streamingContext.fileStream("/akhld/lookhere/", new Function<Path,
> Object>(){
> @Override
> public Object call(Path path) throws Exception {
> // TODO Auto-generated method stub
> return !path.getName().startsWith(".");
> } }, true, SparkUtil.getManifest(Text.class),
> SparkUtil.getManifest(Tuple.class),
> SparkUtil.getManifest(PigInputFormat.class));
Thanks
Best Regards
On Tue, Sep 23, 2014 at 9:47 AM, Michael Quinlan <[email protected]> wrote:
> I'm attempting to code a Java only implementation accessing the
> StreamingContext.fileStream method and am especially interested in setting
> the boolean "newFilesOnly" to false. Unfortunately my code throws
> exceptions:
>
> Exception in thread "main" java.lang.InstantiationException
> at
>
> sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:374)
> at
> org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:83)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>
> whenever the files are opened. The exceptions are generated whether or not
> I
> invoke the longer form of the fileStream method. I can use the
> JavaStreamingContext version successfully, but don't have access to the
> boolean flag in this case. If someone sees an issue with the code below, I
> would be very grateful for a nudge in the right direction.
>
> SparkConf conf = new SparkConf();
> conf.setMaster("local[2]");
> conf.setAppName("SparkStreamingFileTest");
> conf.set("spark.cores.max", "1");
> conf.set("spark.executor.memory","1g");
>
> List<String> inputjarslist = new ArrayList<String>();
>
> inputjarslist.add("/home/usr/target/lib/scala-library-2.10.1.jar");
>
>
> inputjarslist.add("/home/usr/target/lib/spark-assembly-1.0.2-hadoop2.2.0.jar");
>
> inputjarslist.add("/home/usr/target/lib/spark-streaming_2.10-1.0.2.jar");
>
> //Seq<String> inputjars = asScalaBuffer(inputjarslist);
> conf.setJars(inputjarslist.toArray(new String[3]));
>
> StreamingContext scc = new StreamingContext(conf, new
> Duration(10000));
>
> Seq<String> thejars = scc.sc().jars();
> scala.collection.Iterator iter = thejars.iterator();
> if(!(iter.hasNext())) System.out.println("no jars
> associated!!");
> while (iter.hasNext()) {
> System.out.println("Jar in system: "+iter.next());
> }
>
> Function1<Path,Object> f = new
> AbstractFunction1<Path,Object>() {
> public Boolean apply(Path input){
> return true;
> }
> };
>
>
> //scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
>
> ClassTag <LongWritable> k =
> scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
> ClassTag <Text> v
> =scala.reflect.ClassTag$.MODULE$.apply(Text.class);
> ClassTag <InputFormat<LongWritable,Text>> t =
> scala.reflect.ClassTag$.MODULE$.apply(InputFormat.class);
>
> InputDStream<Tuple2<LongWritable,Text>> ans =
> scc.fileStream("/home/usr/testDataDirectory", f, false, k, v, t);
> //InputDStream<Tuple2<LongWritable,Text>> ans =
> scc.fileStream("/home/usr/testDataDirectory",k,v,t);
>
> ans.print();
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>