I'm using Flink 1.4.0

I'm trying to save the results of a Table API query to a CSV file, but I'm
getting an error.
Here are the details:

My Input file looks like this:
id,species,color,weight,name
311,canine,golden,75,dog1
312,canine,brown,22,dog2
313,feline,gray,8,cat1

I run a query on this to select canines only, and I want to save this to a
csv file:

                        ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
                        BatchTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env); 

                        String inputPath = "location-of-source-file";
                        CsvTableSource petsTableSource = 
CsvTableSource.builder()
                                    .path(inputPath)
                                    .ignoreFirstLine()
                                    .fieldDelimiter(",")
                                    .field("id", Types.INT())
                                    .field("species", Types.STRING())
                                    .field("color", Types.STRING())
                                    .field("weight", Types.DOUBLE())
                                    .field("name", Types.STRING())
                                    .build();

                        // Register our table source
                        tableEnv.registerTableSource("pets", petsTableSource);
                        Table pets = tableEnv.scan("pets");

                        Table counts = pets
                                .groupBy("species")
                                .select("species, species.count as count")
                                .filter("species === 'canine'");

                        DataSet<Row> result = tableEnv.toDataSet(counts, 
Row.class);
                        result.print();

                        // Write Results to File
                        TableSink<Row> sink = new 
CsvTableSink("/home/hadoop/output/pets", ",");
                        counts.writeToSink(sink);

When I run this, I get the output from the result.print() call as this:

canine,2

but I do not see any results written
to the file, and I see the error below.
How can I save the results I'm seeing in stdout to a CSV file?
Thanks!



2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
contain a getter for field fields
2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
contain a setter for field fields
2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
POJO type because not all fields are valid POJO fields.
2018-05-27 12:49:44,411 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
contain a getter for field fields
2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
contain a setter for field fields
2018-05-27 12:49:44,412 INFO  [main] typeutils.TypeExtractor
(TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
POJO type because not all fields are valid POJO fields.









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to