Thanks! When I give path to a directory flink is only reading 2 files. It seems to be picking these 2 files randomly.
On Mon, Jul 31, 2017 at 12:05 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Mohit, > > as Ted said, there are plenty of InputFormats which are based on > FileInputFormat. > FileInputFormat also supports reading all files in a directory. Simply > specify the path of the directory. > > Check StreamExecutionEnvironment.createFileInput() which takes a several > parameters such as a FileInputFormat and a time interval in which the > directory is periodically checked. > > Best, Fabian > > 2017-07-30 21:31 GMT+02:00 Ted Yu <yuzhih...@gmail.com>: > >> For #1, you can find quite a few classes which extend FileInputFormat. >> e.g. >> >> flink-connectors/flink-avro/src/main/java/org/apache/flink/ >> api/java/io/AvroInputFormat.java:public class AvroInputFormat<E> extends >> FileInputFormat<E> implements ResultTypeQuer >> flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public >> abstract class BinaryInputFormat<T> extends FileInputFormat<T> >> flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public >> abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> >> implements Checkpoi >> >> flink-streaming-java/src/test/java/org/apache/flink/streamin >> g/runtime/operators/ContinuousFileProcessingRescalingTest.java: >> extends FileInputFormat<String> >> >> FYI >> >> On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <mohitanch...@gmail.com> >> wrote: >> >>> Thanks. Few more questions: >>> >>> - Is there an example for FileInputFormat? >>> - how to make it read all the files in a directory? >>> - how to make an inputformat a streaming input instead of batch? Eg: >>> read as new files come to a dir. >>> >>> Thanks again. >>> >>> On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Flink calls the reachedEnd() method before it calls nextRecord() and >>>> closes the IF when reachedEnd() returns true. >>>> So, it should not return true until nextRecord() was called and the >>>> first and last record was emitted. >>>> >>>> You might also want to built your PDFFileInputFormat on FileInputFormat >>>> and set unsplittable to true. >>>> FileInputFormat comes with lots of built-in functionality such as >>>> InputSplit generation. >>>> >>>> Cheers, Fabian >>>> >>>> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>: >>>> >>>>> Hi, >>>>> >>>>> I created a custom input format. Idea behind this is to read all >>>>> binary files from a directory and use each file as it's own split. Each >>>>> split is read as one whole record. When I run it in flink I don't get any >>>>> error but I am not seeing any output from .print. Am I missing something? >>>>> >>>>> ---- >>>>> >>>>> *public* *class* *PDFFileInputFormat* *extends* >>>>> RichInputFormat<StringValue, InputSplit> { >>>>> >>>>> *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger* >>>>> (PDFFileInputFormat.*class*.getName()); >>>>> >>>>> PDFFileInputSplit current = *null*; >>>>> >>>>> *public* *static* *void* main(String... args) *throws* Exception { >>>>> >>>>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\ >>>>> test"); >>>>> >>>>> InputSplit[] splits = pdfReader.createInputSplits(1); >>>>> >>>>> pdfReader.open(splits[0]); >>>>> >>>>> pdfReader.nextRecord(*null*); >>>>> >>>>> *final* ExecutionEnvironment env = ExecutionEnvironment. >>>>> *getExecutionEnvironment*(); >>>>> >>>>> env.fromElements(1, 2, 3) >>>>> >>>>> // returns the squared i >>>>> >>>>> .print(); >>>>> >>>>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test" >>>>> ); >>>>> >>>>> InputFormatSourceFunction<StringValue> *reader* = *new* >>>>> InputFormatSourceFunction<>(format, >>>>> >>>>> TypeInformation.*of*(StringValue.*class*)); >>>>> >>>>> env.createInput(format,TypeInformation.*of*(StringValue.*class*) >>>>> ).print(); >>>>> >>>>> } >>>>> >>>>> String path = *null*; >>>>> >>>>> *public* PDFFileInputFormat(String path) { >>>>> >>>>> *this*.path = path; >>>>> >>>>> } >>>>> >>>>> *public* *void* configure(Configuration parameters) { >>>>> >>>>> // *TODO* Auto-generated method stub >>>>> >>>>> } >>>>> >>>>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics) >>>>> *throws* IOException { >>>>> >>>>> // *TODO* Auto-generated method stub >>>>> >>>>> *return* cachedStatistics; >>>>> >>>>> } >>>>> >>>>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws* >>>>> IOException { >>>>> >>>>> *final* List<PDFFileInputSplit> splits = *new* >>>>> ArrayList<PDFFileInputSplit>(); >>>>> >>>>> Files.*list*(Paths.*get*(path)).forEach(f -> { >>>>> >>>>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f); >>>>> >>>>> splits.add(split); >>>>> >>>>> }); >>>>> >>>>> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits >>>>> .size()]; >>>>> >>>>> *return* splits.toArray(inputSplitArray); >>>>> >>>>> } >>>>> >>>>> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[] >>>>> inputSplits) { >>>>> >>>>> *logger*.info("Assigner"); >>>>> >>>>> // *TODO* Auto-generated method stub >>>>> >>>>> *return* *new* DefaultInputSplitAssigner(inputSplits); >>>>> >>>>> } >>>>> >>>>> *public* *void* open(InputSplit split) *throws* IOException { >>>>> >>>>> *this*.current = (PDFFileInputSplit) split; >>>>> >>>>> } >>>>> >>>>> *public* *boolean* reachedEnd() *throws* IOException { >>>>> >>>>> // *TODO* Auto-generated method stub >>>>> >>>>> *return* *true*; >>>>> >>>>> } >>>>> >>>>> *public* StringValue nextRecord(StringValue reuse) *throws* >>>>> IOException { >>>>> >>>>> String content = *new* String(Files.*readAllBytes*(*this*.current >>>>> .getFile())); >>>>> >>>>> *logger*.info("Content " + content); >>>>> >>>>> *return* *new* StringValue(content); >>>>> >>>>> } >>>>> >>>>> *public* *void* close() *throws* IOException { >>>>> >>>>> // *TODO* Auto-generated method stub >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> --- >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Mohit >>>>> >>>>> >>>>> >>>> >>> >> >