Looping in Kostas who recently worked on the continuous file inputs. @Kostas: do you have an idea what's happening here?
– Ufuk On 8 December 2016 at 08:43:32, Lukas Kircher (lukas.kirc...@uni-konstanz.de) wrote: > Hi Stefan, > > thanks for your answer. > > > I think there is a field in FileInputFormat (which TextInputFormat is > > subclassing) > that could serve your purpose if you override the default: > > That was my first guess as well. I use the basic setup from > org.apache.flink.api.java.io.TextInputFormatTest.java > and call setNestedFileEnumeration(true), but once the stream is processed > only the > content of the .csv file in the top-most folder is printed. The example is > just a few lines > of self-contained code, see below. Does anybody have an idea? > > Cheers, > Lukas > > > import org.apache.flink.api.java.io.TextInputFormat; > import org.apache.flink.core.fs.Path; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > > import java.io.BufferedWriter; > import java.io.File; > import java.io.FileWriter; > > public class ReadDirectorySSCCE { > public static void main(String[] args) throws Exception { > // create given dirs and add a .csv file to each one > String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"}; > for (String dir: dirs) { > // create input file > File tmpDir = new File(dir); > if (!tmpDir.exists()) { > tmpDir.mkdirs(); > } > File tempFile = File.createTempFile("file", ".csv", tmpDir); > BufferedWriter w = new BufferedWriter(new FileWriter(tempFile)); > w.write("content of " + dir + "/file.csv"); > w.close(); > tempFile.deleteOnExit(); > } > File root = new File("tmp"); > > TextInputFormat inputFormat = new TextInputFormat(new > Path(root.toURI().toString())); > inputFormat.setNestedFileEnumeration(true); > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.createInput(inputFormat).print(); > env.execute(); > } > > > > On 7 Dec 2016, at 17:44, Stefan Richter wrote: > > > > Hi, > > > > I think there is a field in FileInputFormat (which TextInputFormat is > > subclassing) > that could serve your purpose if you override the default: > > > > /** > > * The flag to specify whether recursive traversal of the input directory > > * structure is enabled. > > */ > > protected boolean enumerateNestedFiles = false; > > As for compression, I think this class also provides a > > InflaterInputStreamFactory > to read compressed data. > > > > Best, > > Stefan > > > >> Am 07.12.2016 um 12:10 schrieb Lukas Kircher >: > >> > >> Hi all, > >> > >> I am trying to read nested .csv files from a directory and want to switch > >> from a custom > SourceFunction I implemented to the TextInputFormat. I have two questions: > >> > >> 1) Somehow only the file in the root directory is processed, nested files > >> are skipped. > What am I missing? See the attachment for an SSCCE. I get the same result > with flink 1.1.3 > no matter if I run it via the IDE or submit the job to the standalone binary. > The file input > splits are all there, yet they don't seem to be processed. > >> > >> 2) What is the easiest way to read compressed .csv files (.zip)? > >> > >> Thanks for your help, cheers > >> Lukas > >> > >> > > > >