Littlestar created SPARK-5829:
---------------------------------

             Summary: JavaStreamingContext.fileStream run task repeated empty 
when no more new files
                 Key: SPARK-5829
                 URL: https://issues.apache.org/jira/browse/SPARK-5829
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.2.1
         Environment: spark master (1.3.0) with SPARK-5826 patch.
            Reporter: Littlestar


spark master (1.3.0) with SPARK-5826 patch.

JavaStreamingContext.fileStream run task repeated empty when no more new files

reproduce:
  1. mkdir /testspark/watchdir on HDFS.
  2. run app.
  3. put some text files into /testspark/watchdir.
every 30 seconds, spark log indicates that a new sub task runs.
and /testspark/resultdir/ has new directory with empty files every 30 seconds.

when no new files add, but it runs new task with empy rdd.

{noformat}
package my.test.hadoop.spark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class TestStream {
        @SuppressWarnings({ "serial", "resource" })
        public static void main(String[] args) throws Exception {
            
                SparkConf conf = new SparkConf().setAppName("TestStream");
                JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(30));
                jssc.checkpoint("/testspark/checkpointdir");
                Configuration jobConf = new Configuration();
                jobConf.set("my.test.fields","fields");
        JavaPairDStream<Integer, Integer> is = 
jssc.fileStream("/testspark/watchdir", LongWritable.class, Text.class, 
TextInputFormat.class, new Function<Path, Boolean>() {
            @Override
            public Boolean call(Path v1) throws Exception {
                return true;
            }
        }, true, jobConf).mapToPair(new PairFunction<Tuple2<LongWritable, 
Text>, Integer, Integer>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<LongWritable, Text> 
arg0) throws Exception {
                return new Tuple2<Integer, Integer>(1, 1);
            }
        });

                JavaPairDStream<Integer, Integer> rs = is.reduceByKey(new 
Function2<Integer, Integer, Integer>() {
                        @Override
                        public Integer call(Integer arg0, Integer arg1) throws 
Exception {
                                return arg0 + arg1;
                        }
                });

                rs.checkpoint(Durations.seconds(60));
                rs.saveAsNewAPIHadoopFiles("/testspark/resultdir/output", 
"suffix", Integer.class, Integer.class, TextOutputFormat.class);
                jssc.start();
                jssc.awaitTermination();
        }
}

{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to