Looping in Kostas who recently worked on the continuous file inputs.

@Kostas: do you have an idea what's happening here?

– Ufuk

On 8 December 2016 at 08:43:32, Lukas Kircher (lukas.kirc...@uni-konstanz.de) 
> Hi Stefan,
> thanks for your answer.
> > I think there is a field in FileInputFormat (which TextInputFormat is 
> > subclassing)  
> that could serve your purpose if you override the default:
> That was my first guess as well. I use the basic setup from 
> org.apache.flink.api.java.io.TextInputFormatTest.java  
> and call setNestedFileEnumeration(true), but once the stream is processed 
> only the  
> content of the .csv file in the top-most folder is printed. The example is 
> just a few lines  
> of self-contained code, see below. Does anybody have an idea?
> Cheers,
> Lukas
> import org.apache.flink.api.java.io.TextInputFormat;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
> import java.io.BufferedWriter;
> import java.io.File;
> import java.io.FileWriter;
> public class ReadDirectorySSCCE {
> public static void main(String[] args) throws Exception {
> // create given dirs and add a .csv file to each one
> String[] dirs = new String[] {"tmp", "tmp/first/", "tmp/second/"};
> for (String dir: dirs) {
> // create input file
> File tmpDir = new File(dir);
> if (!tmpDir.exists()) {
> tmpDir.mkdirs();
> }
> File tempFile = File.createTempFile("file", ".csv", tmpDir);
> BufferedWriter w = new BufferedWriter(new FileWriter(tempFile));
> w.write("content of " + dir + "/file.csv");
> w.close();
> tempFile.deleteOnExit();
> }
> File root = new File("tmp");
> TextInputFormat inputFormat = new TextInputFormat(new 
> Path(root.toURI().toString()));  
> inputFormat.setNestedFileEnumeration(true);
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();  
> env.createInput(inputFormat).print();
> env.execute();
> }
> > On 7 Dec 2016, at 17:44, Stefan Richter wrote:
> >
> > Hi,
> >
> > I think there is a field in FileInputFormat (which TextInputFormat is 
> > subclassing)  
> that could serve your purpose if you override the default:
> >
> > /**
> > * The flag to specify whether recursive traversal of the input directory
> > * structure is enabled.
> > */
> > protected boolean enumerateNestedFiles = false;
> > As for compression, I think this class also provides a 
> > InflaterInputStreamFactory  
> to read compressed data.
> >
> > Best,
> > Stefan
> >
> >> Am 07.12.2016 um 12:10 schrieb Lukas Kircher >:  
> >>
> >> Hi all,
> >>
> >> I am trying to read nested .csv files from a directory and want to switch 
> >> from a custom  
> SourceFunction I implemented to the TextInputFormat. I have two questions:
> >>
> >> 1) Somehow only the file in the root directory is processed, nested files 
> >> are skipped.  
> What am I missing? See the attachment for an SSCCE. I get the same result 
> with flink 1.1.3  
> no matter if I run it via the IDE or submit the job to the standalone binary. 
> The file input  
> splits are all there, yet they don't seem to be processed.
> >>
> >> 2) What is the easiest way to read compressed .csv files (.zip)?
> >>
> >> Thanks for your help, cheers
> >> Lukas
> >>
> >>  
> >

Reply via email to