Hi, Hunter, *What **behavior do you see with the HDFS? The local file system and HDFS should have the same ** behavior.*
*Thanks!* *- Terry* Hunter Morgan <hunter.mor...@rackspace.com>于2015年7月16日周四 上午2:04写道: > After moving the setting of the parameter to SparkConf initialization > instead of after the context is already initialized, I have it operating > reliably on local filesystem, but not on hdfs. Are there any differences in > behavior between these two cases I should be aware of? > > > > I don’t usually mailinglist or exchange, so forgive me for my ignorance of > whether this message will go horribly wrong due to formatting. > > > > I plan to port the following code to Hadoop FS API to generalize testing > to understand actual behavior and ensure desired behavior. > > public static JavaDStream<String> > textFileStreamIncludingExisting(JavaStreamingContext context, String path) > { > return context.fileStream(path, LongWritable > .class, Text.class, TextInputFormat.class, v1 -> true, > false).map(v1 -> v1._2.toString()); > } > > > > @Test > public void testTextFileStreamIncludingExistingReadsOldFiles() throws > Exception > { > final Path testDir = Files.createTempDirectory("sparkTest"); > final ArrayList<Path> tempFiles = new ArrayList(); > > // create 20 "old" files > final int testFileNumberLimit = 20; > for (int testFileNumber = 0; testFileNumber < testFileNumberLimit; > testFileNumber++) > { > final Path testFile = Files.createTempFile(testDir, "testFile", "" > ); > tempFiles.add(testFile); > final FileWriter fileWriter = new FileWriter(testFile.toFile()); > fileWriter.write("asdf"); > fileWriter.flush(); > fileWriter.close(); > for (String eachAttribute : new String[]{"basic:lastAccessTime", > "basic:lastModifiedTime", > "basic:creationTime"}) > { // set file dates 0 to 20 days ago > Files.setAttribute(testFile, eachAttribute, FileTime.from( > Instant.now().minus(Duration.ofDays > (testFileNumber)))); > } > } > > final SparkConf sparkConf = new SparkConf().setMaster("local[1]"). > setAppName("test"); > sparkConf.set("spark.streaming.minRememberDuration", String.valueOf( > Integer.MAX_VALUE)); > final JavaStreamingContext context = new JavaStreamingContext( > sparkConf, Durations.seconds(1)); > final JavaDStream<String> input = SparkUtil. > textFileStreamIncludingExisting(context, String.valueOf(testDir > .toUri())); > // count files read > final Accumulator<Integer> accumulator = context.sparkContext(). > accumulator(0); > > // setup async wait > Semaphore done = new Semaphore(1); > done.acquire(); > input.foreachRDD(new Function<JavaRDD<String>, Void>() > { > @Override > public Void call(JavaRDD<String> v1) throws Exception > { > if (v1.count() == 0) > { > done.release(); > } > accumulator.add((int) v1.count()); > return null; > } > }); > context.start(); > // wait for completion or 20 sec > done.tryAcquire(20, TimeUnit.SECONDS); > context.stop(); > > assertThat(accumulator.value(), is(testFileNumberLimit)); > > for (Path eachTempFile : tempFiles) > { > Files.deleteIfExists(eachTempFile); > } > Files.deleteIfExists(testDir); > } > > > > > > *From:* Tathagata Das [mailto:t...@databricks.com] > *Sent:* Wednesday, July 15, 2015 00:01 > *To:* Terry Hole > *Cc:* Hunter Morgan; user@spark.apache.org > > > *Subject:* Re: fileStream with old files > > > > It was added, but its not documented publicly. I am planning to change the > name of the conf to spark.streaming.fileStream.minRememberDuration to make > it easier to understand > > > > On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole <hujie.ea...@gmail.com> wrote: > > A new configuration named *spark.streaming.minRememberDuration* was > added since 1.2.1 to control the file stream input, the default value is *60 > seconds*, you can change this value to a large value to include older > files (older than 1 minute) > > > > You can get the detail from this jira: > https://issues.apache.org/jira/browse/SPARK-3276 > > > > -Terry > > > > On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant < > hunter.mor...@rackspace.com> wrote: > > It's not as odd as it sounds. I want to ensure that long streaming job > outages can recover all the files that went into a directory while the job > was down. > I've looked at > > http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039 > and > > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435 > and > > https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e > , but all seem unhelpful. > I've tested combinations of the following: > * fileStreams created with dumb accept-all filters > * newFilesOnly true and false, > * tweaking minRememberDuration to high and low values, > * on hdfs or local directory. > The problem is that it will not read files in the directory from more than > a > minute ago. > JavaPairInputDStream<LongWritable, Text> input = context.fileStream(indir, > LongWritable.class, Text.class, TextInputFormat.class, v -> true, false); > Also tried with having set: > context.sparkContext().getConf().set("spark.streaming.minRememberDuration", > "1654564"); to big/small. > > Are there known limitations of the onlyNewFiles=false? Am I doing something > wrong? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > >