Hi,

I am trying Table API in Flink 1.11:

tEnv.executeSql("CREATE TABLE people (\n" +
        "    id  INT,\n" +
        "    name STRING\n" +
        ") WITH (\n" +
        "    'connector' = 'filesystem',\n" +
        "    'path'     = 'file:///data/test.parquet',\n" +
        "    'format'    = 'parquet',\n" +
        "    'properties.bootstrap.servers' = 'kafka:9092'\n" +
        ")");

It failed:

jobmanager_1      | java.io.IOException: No FileSystem for scheme: file
jobmanager_1      |     at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2799)
~[?:?]
jobmanager_1      |     at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810)
~[?:?]
jobmanager_1      |     at
org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100) ~[?:?]
jobmanager_1      |     at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
~[?:?]
jobmanager_1      |     at
org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831) ~[?:?]
jobmanager_1      |     at
org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389) ~[?:?]
jobmanager_1      |     at
org.apache.hadoop.fs.Path.getFileSystem(Path.java:356) ~[?:?]
jobmanager_1      |     at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
~[?:?]
jobmanager_1      |     at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
~[?:?]
jobmanager_1      |     at
org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader.<init>(ParquetColumnarRowSplitReader.java:125)
~[?:?]
jobmanager_1      |     at
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:131)
~[?:?]
jobmanager_1      |     at
org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory$ParquetInputFormat.open(ParquetFileSystemFormatFactory.java:173)
~[?:?]
jobmanager_1      |     at
org.apache.flink.formats.parquet.ParquetFileSystemFormatFactory$ParquetInputFormat.open(ParquetFileSystemFormatFactory.java:128)
~[?:?]
jobmanager_1      |     at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
job


The pom.xml has below dependencies:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet_2.11</artifactId>
    <version>1.11.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.11.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.12</artifactId>
    <version>1.11.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>2.4.4</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.8.4</version>
</dependency>

Any idea? Thanks!


Regards

Leon

Reply via email to