Hi, Hunter,

*What **behavior do you see with the HDFS? The local file system and HDFS
should have the same ** behavior.*

*Thanks!*
*- Terry*

Hunter Morgan <hunter.mor...@rackspace.com>于2015年7月16日周四 上午2:04写道:

>  After moving the setting of the parameter to SparkConf initialization
> instead of after the context is already initialized, I have it operating
> reliably on local filesystem, but not on hdfs. Are there any differences in
> behavior between these two cases I should be aware of?
>
>
>
> I don’t usually mailinglist or exchange, so forgive me for my ignorance of
> whether this message will go horribly wrong due to formatting.
>
>
>
> I plan to port the following code to Hadoop FS API to generalize testing
> to understand actual behavior and ensure desired behavior.
>
> public static JavaDStream<String> 
> textFileStreamIncludingExisting(JavaStreamingContext context, String path)
> {
>     return context.fileStream(path, LongWritable
>             .class, Text.class, TextInputFormat.class, v1 -> true, 
> false).map(v1 -> v1._2.toString());
> }
>
>
>
> @Test
> public void testTextFileStreamIncludingExistingReadsOldFiles() throws
> Exception
> {
>     final Path testDir = Files.createTempDirectory("sparkTest");
>     final ArrayList<Path> tempFiles = new ArrayList();
>
>     // create 20 "old" files
>     final int testFileNumberLimit = 20;
>     for (int testFileNumber = 0; testFileNumber < testFileNumberLimit;
> testFileNumber++)
>     {
>         final Path testFile = Files.createTempFile(testDir, "testFile", ""
> );
>         tempFiles.add(testFile);
>         final FileWriter fileWriter = new FileWriter(testFile.toFile());
>         fileWriter.write("asdf");
>         fileWriter.flush();
>         fileWriter.close();
>         for (String eachAttribute : new String[]{"basic:lastAccessTime",
> "basic:lastModifiedTime",
>                 "basic:creationTime"})
>         { // set file dates 0 to 20 days ago
>             Files.setAttribute(testFile, eachAttribute, FileTime.from(
> Instant.now().minus(Duration.ofDays
>                     (testFileNumber))));
>         }
>     }
>
>     final SparkConf sparkConf = new SparkConf().setMaster("local[1]").
> setAppName("test");
>     sparkConf.set("spark.streaming.minRememberDuration", String.valueOf(
> Integer.MAX_VALUE));
>     final JavaStreamingContext context = new JavaStreamingContext(
> sparkConf, Durations.seconds(1));
>     final JavaDStream<String> input = SparkUtil.
> textFileStreamIncludingExisting(context, String.valueOf(testDir
>             .toUri()));
>     // count files read
>     final Accumulator<Integer> accumulator = context.sparkContext().
> accumulator(0);
>
>     // setup async wait
>     Semaphore done = new Semaphore(1);
>     done.acquire();
>     input.foreachRDD(new Function<JavaRDD<String>, Void>()
>     {
>         @Override
>         public Void call(JavaRDD<String> v1) throws Exception
>         {
>             if (v1.count() == 0)
>             {
>                 done.release();
>             }
>             accumulator.add((int) v1.count());
>             return null;
>         }
>     });
>     context.start();
>     // wait for completion or 20 sec
>     done.tryAcquire(20, TimeUnit.SECONDS);
>     context.stop();
>
>     assertThat(accumulator.value(), is(testFileNumberLimit));
>
>     for (Path eachTempFile : tempFiles)
>     {
>         Files.deleteIfExists(eachTempFile);
>     }
>     Files.deleteIfExists(testDir);
> }
>
>
>
>
>
> *From:* Tathagata Das [mailto:t...@databricks.com]
> *Sent:* Wednesday, July 15, 2015 00:01
> *To:* Terry Hole
> *Cc:* Hunter Morgan; user@spark.apache.org
>
>
> *Subject:* Re: fileStream with old files
>
>
>
> It was added, but its not documented publicly. I am planning to change the
> name of the conf to spark.streaming.fileStream.minRememberDuration to make
> it easier to understand
>
>
>
> On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole <hujie.ea...@gmail.com> wrote:
>
>  A new configuration named *spark.streaming.minRememberDuration* was
> added since 1.2.1 to control the file stream input, the default value is *60
> seconds*, you can change this value to a large value to include older
> files (older than 1 minute)
>
>
>
> You can get the detail from this jira:
> https://issues.apache.org/jira/browse/SPARK-3276
>
>
>
> -Terry
>
>
>
> On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant <
> hunter.mor...@rackspace.com> wrote:
>
> It's not as odd as it sounds. I want to ensure that long streaming job
> outages can recover all the files that went into a directory while the job
> was down.
> I've looked at
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
> and
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
> and
>
> https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
> , but all seem unhelpful.
> I've tested combinations of the following:
>  * fileStreams created with dumb accept-all filters
>  * newFilesOnly true and false,
>  * tweaking minRememberDuration to high and low values,
>  * on hdfs or local directory.
> The problem is that it will not read files in the directory from more than
> a
> minute ago.
> JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir,
> LongWritable.class, Text.class, TextInputFormat.class, v -> true, false);
> Also tried with having set:
> context.sparkContext().getConf().set("spark.streaming.minRememberDuration",
> "1654564"); to big/small.
>
> Are there known limitations of the onlyNewFiles=false? Am I doing something
> wrong?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>

Reply via email to