Hi to all,
I've tried to register an external catalog and use it with the Table API in
Flink 1.6.1.
The following (Java) test job cannot write to a sink using insertInto
because Flink cannot find the table by id (test.t2). Am I doing something
wrong or is this a bug?

This is my Java test class:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.catalog.ExternalCatalogTable;
import org.apache.flink.table.catalog.InMemoryExternalCatalog;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;

public class CatalogExperiment {
  public static void main(String[] args) throws Exception {
    // create an external catalog
    final String outPath = "file:/tmp/file2.txt";
    InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test");
    FileSystem connDescIn = new
FileSystem().path("file:/tmp/file-test.txt");
    FileSystem connDescOut = new FileSystem().path(outPath);
    FormatDescriptor csvDesc = new Csv()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string")//
        .fieldDelimiter("\t");
    Schema schemaDesc = new Schema()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string");
    ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
        .withFormat(csvDesc)//
        .withSchema(schemaDesc)//
        .asTableSource();
    ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
        .withFormat(csvDesc)//
        .withSchema(schemaDesc)//
        .asTableSink();
    catalog.createTable("t1", t1, true);
    catalog.createTable("t2", t2, true);

    final  ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final BatchTableEnvironment btEnv =
TableEnvironment.getTableEnvironment(env);
    btEnv.registerExternalCatalog("test", catalog);
    // this does not work ---------------------------------------
    btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was
registered under the name test.t2
    // this works ---------------------------------------
    btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t", 1,
WriteMode.OVERWRITE));
    env.execute();
  }
}


Best,
Flavio

Reply via email to