Hi All,

1) The Documentation says full outer join is supported however the below
code just exits with value 1. No error message.

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Test {

    public static void main(String... args) throws Exception {

        EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(env, bsSettings);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<>(
                java.util.regex.Pattern.compile("test-topic1"),
                new SimpleStringSchema(),
                properties);
        FlinkKafkaConsumer<String> consumer2 = new FlinkKafkaConsumer<>(
                java.util.regex.Pattern.compile("test-topic2"),
                new SimpleStringSchema(),
                properties);

        DataStream<String> stream1 = env.addSource(consumer1);
        DataStream<String> stream2 = env.addSource(consumer2);

        bsTableEnv.registerDataStream("sample1", stream1);
        bsTableEnv.registerDataStream("sample2", stream2);

        Table result = bsTableEnv.sqlQuery("SELECT * FROM sample1 FULL
OUTER JOIN sample2 on sample1.f0=sample2.f0");
        result.printSchema();

        bsTableEnv.toAppendStream(result, Row.class).print();
        bsTableEnv.execute("sample job");
    }
}


2) If I am using a blink planner should I use TableEnvironment or
StreamTableEnvironment ?

3) Why flink current stable documentation(1.9) recommends (old planner)?
any rough timeline on when we would be able to use blink planner in
production? perhaps 1.10 or 1.11?

Thanks!

Reply via email to