Re: Spark Streaming using File Stream in Java
The fileStream is not designed to work with continuously updating file, as the one of the main design goals of Spark is immutability (to guarantee fault-tolerance by recomputation), and files that are appending (mutating) defeats that. It rather designed to pickup new files added atomically (using move) to a directory. So to make it work with your continuously updated file, you will probably have to write something that periodically rotates the continuously updated log file into separate files, and then those files gets copied into a directory. TD On Wed, Jul 9, 2014 at 9:34 AM, Aravind wrote: > Hi Akil, > > It didnt work. Here is the code... > > > package com.paypal; > > import org.apache.spark.SparkConf; > import org.apache.spark.storage.StorageLevel; > import org.apache.spark.streaming.api.java.JavaPairInputDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > import org.apache.spark.api.java.*; > import org.apache.spark.api.java.function.*; > import org.apache.spark.streaming.*; > import org.apache.spark.streaming.api.java.*; > > import com.google.common.collect.Lists; > > import org.apache.spark.streaming.receiver.Receiver; > import scala.Tuple2; > > import java.net.ConnectException; > import java.net.Socket; > import java.util.Arrays; > import java.util.regex.Pattern; > import java.io.*; > /** > * Hello world! > * > */ > public class App3 > { > private static final Pattern SPACE = Pattern.compile(" "); > > public static void main(String[] args) { > > // Create the context with a 1 second batch size > SparkConf sparkConf = new > SparkConf().setAppName("JavaNetworkWordCount"); > > // *** always give local[4] to execute and see the output > JavaStreamingContext ssc = new JavaStreamingContext("local[4]", > "JavaNetworkWordCount", new Duration(5000)); > > // throws an error saying requires JavaPairDstream and not JavaDstream. > JavaDStream lines = > ssc.fileStream("/Users/../Desktop/alarms.log"); > JavaDStream words = lines.flatMap( > new FlatMapFunction() { > public Iterable call(String s) { > return Arrays.asList(s.split(" ")); > } > } > ); > > JavaPairDStream ones = words.map( > new Function() { > public Tuple2 call(String s) { > return new Tuple2(s, 1); > } > } > ); > > JavaPairDStream counts = ones.reduceByKey( > new Function2() { > public Integer call(Integer i1, Integer i2) { > return i1 + i2; > } > } > ); > > > System.out.println("Hello world"); > wordCounts.print(); > > ssc.start(); > ssc.awaitTermination(); > } > > > } > > I am not able to figure out how to type cast the objects of Type > JavaPairDStream to JDstream. Can you provide me a working code for the > same. > Thanks in advance. > > Regards > Aravindan > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-using-File-Stream-in-Java-tp9115p9204.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >
Re: Spark Streaming using File Stream in Java
Hi Akil, It didnt work. Here is the code... package com.paypal; import org.apache.spark.SparkConf; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import com.google.common.collect.Lists; import org.apache.spark.streaming.receiver.Receiver; import scala.Tuple2; import java.net.ConnectException; import java.net.Socket; import java.util.Arrays; import java.util.regex.Pattern; import java.io.*; /** * Hello world! * */ public class App3 { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) { // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount"); // *** always give local[4] to execute and see the output JavaStreamingContext ssc = new JavaStreamingContext("local[4]", "JavaNetworkWordCount", new Duration(5000)); // throws an error saying requires JavaPairDstream and not JavaDstream. JavaDStream lines = ssc.fileStream("/Users/../Desktop/alarms.log"); JavaDStream words = lines.flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } ); JavaPairDStream ones = words.map( new Function() { public Tuple2 call(String s) { return new Tuple2(s, 1); } } ); JavaPairDStream counts = ones.reduceByKey( new Function2() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } } ); System.out.println("Hello world"); wordCounts.print(); ssc.start(); ssc.awaitTermination(); } } I am not able to figure out how to type cast the objects of Type JavaPairDStream to JDstream. Can you provide me a working code for the same. Thanks in advance. Regards Aravindan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-using-File-Stream-in-Java-tp9115p9204.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming using File Stream in Java
Try this out: JavaStreamingContext sc = new JavaStreamingContext(...);JavaDStream lines = ctx.fileStream("whatever");JavaDStream words = lines.flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } }); JavaPairDStream ones = words.map( new PairFunction() { public Tuple2 call(String s) { return new Tuple2(s, 1); } }); JavaPairDStream counts = ones.reduceByKey( new Function2() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); Actually modified from https://spark.apache.org/docs/0.9.1/java-programming-guide.html#example Thanks Best Regards On Wed, Jul 9, 2014 at 6:03 AM, Aravind wrote: > Hi all, > > I am trying to run the NetworkWordCount.java file in the streaming > examples. > The example shows how to read from a network socket. But my usecase is that > , I have a local log file which is a stream and continuously updated (say > /Users/.../Desktop/mylog.log). > > I would like to write the same NetworkWordCount.java using this filestream > > jssc.fileStream(dataDirectory); > > Question: > 1. How do I write a mapreduce function for the above to measure wordcounts > (in java, not scala)? > > 2. Also does the streaming application stop if the file is not updating or > does it continuously poll for the file updates? > > I am a new user of Apache Spark Streaming. Kindly help me as I am totally > stuck > > Thanks in advance. > > Regards > Aravind > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-using-File-Stream-in-Java-tp9115.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >