liang ding created FLINK-18724:
----------------------------------

             Summary: Integration with DataStream and DataSet API report error 
                 Key: FLINK-18724
                 URL: https://issues.apache.org/jira/browse/FLINK-18724
             Project: Flink
          Issue Type: Bug
          Components: API / Core, Connectors / Kafka, Table SQL / API
    Affects Versions: 1.11.1
            Reporter: liang ding


I want to create a table from a DataStream(kafka) : there is two reason I need 
to use DataStream:
 # I need decode msg to columns by custom format, in sql mode I don't known how 
to do it.
 # I have realize DeserializationSchema or FlatMapFunction both. when use 
datastream I can do many things before it become a suitable table, that is my 
prefer way in any other apply.
 so I do it like that:

{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings tSet= 
EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv=StreamTableEnvironment.create(env,tSet);
DataStream<MyRow> stream = env
                .addSource(new FlinkKafkaConsumer<>("test-log", new 
SimpleStringSchema(), properties))
                .flatMap(new LogParser());
//stream.printToErr();
        tEnv.fromDataStream(stream).select("userId,city").execute().print();
        tEnv.execute("test-sql");
        //env.execute("test");
{code}
then I got message:
{noformat}
 [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select: 
(userId,city) -> to: Row (3/3)] INFO 
org.apache.kafka.clients.FetchSessionHandler - [Consumer 
clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full fetch 
response with extra=(test-log-0, response=(
 [Kafka Fetcher for Source: Custom Source -> Flat Map ->* -> select: 
(userId,city) -> to: Row (3/3)] INFO 
org.apache.kafka.clients.FetchSessionHandler - [Consumer 
clientId=consumer-flink-3-5, groupId=flink-3] Node 0 sent an invalid full fetch 
response with extra=(test-log-1, response=({noformat}
it seen like both StreamExecutionEnvironment and StreamTableEnvironment start 
the fetcher and make no one successed.

and there is no guide Integration which made me confused: should I do 
env.execute or 
 tableEnv.execute or both(it's seen not) ? and the blink planner way



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to