Hi,

Is CSV format supported for Kafka in Flink 1.10? It says I need to specify
connector.type as Filesystem but documentation says it is supported for
Kafka?

import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class Test {

    public static void main(String... args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setStateBackend((StateBackend) new
RocksDBStateBackend("file:///tmp/rocksdb"));
        StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment, settings);

        tableEnvironment
                .connect(
                    new Kafka()
                        .version("0.11")
                        .topic("test-topic1")
                )
                .withFormat(new Csv())
                .withSchema(new Schema().field("f0", DataTypes.STRING()))
                .inAppendMode()
                .createTemporaryTable("kafka_source");

        Table resultTable = tableEnvironment.sqlQuery("select * from
kafka_source");
        tableEnvironment.toAppendStream(resultTable, Row.class).print();

        tableEnvironment.execute("Sample Job");
    }
}


This code generates the following error

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.property-version=1
connector.topic=test-topic1
connector.type=kafka
connector.version=0.11
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=f0
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 34 more

Reply via email to