Hi All,

I am trying a simple example which looks like this:
StreamExecutionEnvironment see =
StreamExecutionEnvironment.createLocalEnvironment();
PojoCsvInputFormat<TestPojo> pcif = new PojoCsvInputFormat<>(inPath,
PojoTypeInfo<TestPojo>) TypeExtractor.createTypeInfo(TestPojo.class));
DataStreamSource<TestPojo> stream = see.readFile(pcif, inPath,
FileProcessingMode.PROCESS_CONTINUOUSLY, 5000);
stream.writeAsText(outPath);
see.execute();

TestPojo looks like this:
public class TestPojo {
public int a;
public TestPojo() {}
}

But I am getting the following exception:
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The type returned
by the input format could not be automatically determined. Please
specify the TypeInformation of the produced type explicitly by using
the 'createInput(InputFormat, TypeInformation)' method instead.
    at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.readFile

Is reading csv data into pojos not supported with streaming readFile
or am I doing something wrong here?

Thank you

Reply via email to