For #1, you can find quite a few classes which extend FileInputFormat.
e.g.

flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java:public
class AvroInputFormat<E> extends FileInputFormat<E> implements
ResultTypeQuer
flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java:public
abstract class BinaryInputFormat<T> extends FileInputFormat<T>
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java:public
abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
implements Checkpoi

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java:
            extends FileInputFormat<String>

FYI

On Sun, Jul 30, 2017 at 12:26 PM, Mohit Anchlia <mohitanch...@gmail.com>
wrote:

> Thanks. Few more questions:
>
> - Is there an example for FileInputFormat?
> - how to make it read all the files in a directory?
> - how to make an inputformat a streaming input instead of batch? Eg: read
> as new files come to a dir.
>
> Thanks again.
>
> On Sun, Jul 30, 2017 at 12:53 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink calls the reachedEnd() method before it calls nextRecord() and
>> closes the IF when reachedEnd() returns true.
>> So, it should not return true until nextRecord() was called and the first
>> and last record was emitted.
>>
>> You might also want to built your PDFFileInputFormat on FileInputFormat
>> and set unsplittable to true.
>> FileInputFormat comes with lots of built-in functionality such as
>> InputSplit generation.
>>
>> Cheers, Fabian
>>
>> 2017-07-30 3:41 GMT+02:00 Mohit Anchlia <mohitanch...@gmail.com>:
>>
>>> Hi,
>>>
>>> I created a custom input format. Idea behind this is to read all binary
>>> files from a directory and use each file as it's own split. Each split is
>>> read as one whole record. When I run it in flink I don't get any error but
>>> I am not seeing any output from .print. Am I missing something?
>>>
>>> ----
>>>
>>> *public* *class* *PDFFileInputFormat* *extends*
>>> RichInputFormat<StringValue, InputSplit> {
>>>
>>> *private* *static* *final* Logger *logger* = LoggerFactory.*getLogger*
>>> (PDFFileInputFormat.*class*.getName());
>>>
>>> PDFFileInputSplit current = *null*;
>>>
>>> *public* *static* *void* main(String... args) *throws* Exception {
>>>
>>> PDFFileInputFormat pdfReader = *new* PDFFileInputFormat("c:\\proj\\test"
>>> );
>>>
>>> InputSplit[] splits = pdfReader.createInputSplits(1);
>>>
>>> pdfReader.open(splits[0]);
>>>
>>> pdfReader.nextRecord(*null*);
>>>
>>> *final* ExecutionEnvironment env = ExecutionEnvironment.
>>> *getExecutionEnvironment*();
>>>
>>> env.fromElements(1, 2, 3)
>>>
>>> // returns the squared i
>>>
>>> .print();
>>>
>>> PDFFileInputFormat format = *new* PDFFileInputFormat("c:\\proj\\test");
>>>
>>> InputFormatSourceFunction<StringValue> *reader* = *new*
>>> InputFormatSourceFunction<>(format,
>>>
>>> TypeInformation.*of*(StringValue.*class*));
>>>
>>> env.createInput(format,TypeInformation.*of*(StringValue.*class*)
>>> ).print();
>>>
>>> }
>>>
>>> String path = *null*;
>>>
>>> *public* PDFFileInputFormat(String path) {
>>>
>>> *this*.path = path;
>>>
>>> }
>>>
>>> *public* *void* configure(Configuration parameters) {
>>>
>>> // *TODO* Auto-generated method stub
>>>
>>> }
>>>
>>> *public* BaseStatistics getStatistics(BaseStatistics cachedStatistics)
>>> *throws* IOException {
>>>
>>> // *TODO* Auto-generated method stub
>>>
>>> *return* cachedStatistics;
>>>
>>> }
>>>
>>> *public* InputSplit[] createInputSplits(*int* minNumSplits) *throws*
>>> IOException {
>>>
>>> *final* List<PDFFileInputSplit> splits = *new*
>>> ArrayList<PDFFileInputSplit>();
>>>
>>> Files.*list*(Paths.*get*(path)).forEach(f -> {
>>>
>>> PDFFileInputSplit split = *new* PDFFileInputSplit(splits.size(), f);
>>>
>>> splits.add(split);
>>>
>>> });
>>>
>>> PDFFileInputSplit[] inputSplitArray = *new* PDFFileInputSplit[splits
>>> .size()];
>>>
>>> *return* splits.toArray(inputSplitArray);
>>>
>>> }
>>>
>>> *public* InputSplitAssigner getInputSplitAssigner(InputSplit[]
>>> inputSplits) {
>>>
>>> *logger*.info("Assigner");
>>>
>>> // *TODO* Auto-generated method stub
>>>
>>> *return* *new* DefaultInputSplitAssigner(inputSplits);
>>>
>>> }
>>>
>>> *public* *void* open(InputSplit split) *throws* IOException {
>>>
>>> *this*.current = (PDFFileInputSplit) split;
>>>
>>> }
>>>
>>> *public* *boolean* reachedEnd() *throws* IOException {
>>>
>>> // *TODO* Auto-generated method stub
>>>
>>> *return* *true*;
>>>
>>> }
>>>
>>> *public* StringValue nextRecord(StringValue reuse) *throws* IOException
>>> {
>>>
>>> String content = *new* String(Files.*readAllBytes*(*this*.current
>>> .getFile()));
>>>
>>> *logger*.info("Content " + content);
>>>
>>> *return* *new* StringValue(content);
>>>
>>> }
>>>
>>> *public* *void* close() *throws* IOException {
>>>
>>> // *TODO* Auto-generated method stub
>>>
>>> }
>>>
>>> }
>>>
>>> ---
>>>
>>>
>>> Thanks,
>>>
>>> Mohit
>>>
>>>
>>>
>>
>

Reply via email to