I'm attempting to code a Java only implementation accessing the
StreamingContext.fileStream method and am especially interested in setting
the boolean "newFilesOnly" to false. Unfortunately my code throws
exceptions:
Exception in thread "main" java.lang.InstantiationException
at
sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:83)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
whenever the files are opened. The exceptions are generated whether or not I
invoke the longer form of the fileStream method. I can use the
JavaStreamingContext version successfully, but don't have access to the
boolean flag in this case. If someone sees an issue with the code below, I
would be very grateful for a nudge in the right direction.
SparkConf conf = new SparkConf();
conf.setMaster("local[2]");
conf.setAppName("SparkStreamingFileTest");
conf.set("spark.cores.max", "1");
conf.set("spark.executor.memory","1g");
List<String> inputjarslist = new ArrayList<String>();
inputjarslist.add("/home/usr/target/lib/scala-library-2.10.1.jar");
inputjarslist.add("/home/usr/target/lib/spark-assembly-1.0.2-hadoop2.2.0.jar");
inputjarslist.add("/home/usr/target/lib/spark-streaming_2.10-1.0.2.jar");
//Seq<String> inputjars = asScalaBuffer(inputjarslist);
conf.setJars(inputjarslist.toArray(new String[3]));
StreamingContext scc = new StreamingContext(conf, new
Duration(10000));
Seq<String> thejars = scc.sc().jars();
scala.collection.Iterator iter = thejars.iterator();
if(!(iter.hasNext())) System.out.println("no jars
associated!!");
while (iter.hasNext()) {
System.out.println("Jar in system: "+iter.next());
}
Function1<Path,Object> f = new
AbstractFunction1<Path,Object>() {
public Boolean apply(Path input){
return true;
}
};
//scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
ClassTag <LongWritable> k =
scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
ClassTag <Text> v
=scala.reflect.ClassTag$.MODULE$.apply(Text.class);
ClassTag <InputFormat<LongWritable,Text>> t =
scala.reflect.ClassTag$.MODULE$.apply(InputFormat.class);
InputDStream<Tuple2<LongWritable,Text>> ans =
scc.fileStream("/home/usr/testDataDirectory", f, false, k, v, t);
//InputDStream<Tuple2<LongWritable,Text>> ans =
scc.fileStream("/home/usr/testDataDirectory",k,v,t);
ans.print();
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]