Given the popularity of Flink SQL and Kafka as streaming source, I think we
can add some examples of using Kafka[XXX]TableSource in
flink-examples/flink-examples-table module. What do you guys think?

Cheers
Shuyi

On Mon, Jun 4, 2018 at 12:57 AM, Timo Walther <twal...@apache.org> wrote:

> Hi,
>
> as you can see in code [1] Kafka09JsonTableSource takes a TableSchema. You
> can create table schema from type information see [2].
>
> Regards,
> Timo
>
> [1] https://github.com/apache/flink/blob/master/flink-connectors
> /flink-connector-kafka-0.9/src/main/java/org/apache/
> flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
> [2] https://github.com/apache/flink/blob/master/flink-libraries/
> flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
>
> Am 02.06.18 um 18:31 schrieb Radhya Sahal:
>
> Thanks Rong,
>>
>> I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?
>>
>> Yes, there are two names but now I put one name only and I want to define
>> jsonschema.
>>
>>
>>
>> Rong Rong wrote
>>
>>> Hi Radhya,
>>>
>>> Can you provide which Flink version you are using? Based on the latest
>>> FLINK 1.5 release, Kafka09JsonTableSource takes:
>>>
>>> /**
>>>   * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
>>>   *
>>>   * @param topic       Kafka topic to consume.
>>>   * @param properties  Properties for the Kafka consumer.
>>>   * @param tableSchema The schema of the table.
>>>   * @param jsonSchema  The schema of the JSON messages to decode from
>>> Kafka.
>>>   */
>>>
>>> Also, your type definition: TypeInformation
>>> <Row>
>>>   typeInfo2 = Types.ROW(...
>>> arguments seem to have different length for schema names and types.
>>>
>>> Thanks,
>>> Rong
>>>
>>> On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal &lt;
>>> radhya.sahal@
>>> &gt; wrote:
>>>
>>> Hi,
>>>>
>>>> Could anyone help me to solve this problem
>>>>
>>>>
>>>> /Exception in thread "main" java.lang.Error: Unresolved compilation
>>>> problem:
>>>>          The constructor Kafka09JsonTableSource(String, Properties,
>>>> TypeInformation
>>>>
>>> <Row>
>>> ) is undefined
>>>
>>>> /
>>>> *--This is the code *
>>>> public class FlinkKafkaSQL {
>>>>          public static void main(String[] args) throws Exception {
>>>>              // Read parameters from command line
>>>>              final ParameterTool params = ParameterTool.fromArgs(args);
>>>>
>>>>              if(params.getNumberOfParameters() < 5) {
>>>>                  System.out.println("\nUsage: FlinkReadKafka " +
>>>>                                     "--read-topic
>>>>
>>> <topic>
>>>   " +
>>>
>>>>                                     "--write-topic
>>>>
>>> <topic>
>>>   " +
>>>
>>>>                                     "--bootstrap.servers
>>>>
>>> <kafka brokers>
>>>   " +
>>>
>>>>                                     "zookeeper.connect" +
>>>>                                     "--group.id
>>>>
>>> <groupid>
>>> ");
>>>
>>>>                  return;
>>>>              }
>>>>
>>>>              // setup streaming environment
>>>>              StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>> env.getConfig().setRestartStrategy(RestartStrategies.fixedDe
>>>> layRestart(4,
>>>> 10000));
>>>>              env.enableCheckpointing(300000); // 300 seconds
>>>>              env.getConfig().setGlobalJobParameters(params);
>>>>
>>>>              StreamTableEnvironment tableEnv =
>>>> TableEnvironment.getTableEnvironment(env);
>>>>
>>>>              // specify JSON field names and types
>>>>
>>>>
>>>>              TypeInformation
>>>>
>>> <Row>
>>>   typeInfo2 = Types.ROW(
>>>
>>>>                      new String[] { "iotdevice", "sensorID" },
>>>>                      new TypeInformation<?>[] { Types.STRING()}
>>>>              );
>>>>
>>>>              // create a new tablesource of JSON from kafka
>>>>              KafkaJsonTableSource kafkaTableSource = new
>>>> Kafka09JsonTableSource(
>>>>                      params.getRequired("read-topic"),
>>>>                      params.getProperties(),
>>>>                      typeInfo2);
>>>>
>>>>              // run some SQL to filter results where a key is not null
>>>>              String sql = "SELECT sensorID " +
>>>>                           "FROM iotdevice ";
>>>>              tableEnv.registerTableSource("iotdevice",
>>>> kafkaTableSource);
>>>>              Table result = tableEnv.sql(sql);
>>>>
>>>>              // create a partition for the data going into kafka
>>>>              FlinkFixedPartitioner partition =  new
>>>> FlinkFixedPartitioner();
>>>>
>>>>              // create new tablesink of JSON to kafka
>>>>              KafkaJsonTableSink kafkaTableSink = new
>>>> Kafka09JsonTableSink(
>>>>                      params.getRequired("write-topic"),
>>>>                      params.getProperties(),
>>>>                      partition);
>>>>
>>>>              result.writeToSink(kafkaTableSink);
>>>>
>>>>              env.execute("FlinkReadWriteKafkaJSON");
>>>>          }
>>>> }
>>>>
>>>>
>>>> *This is the dependencies  in pom.xml*
>>>>
>>>>
>>>>
>>> <dependencies>
>>>
>>>>
>>>>
>>> <dependency>
>>>
>>>>
>>>>
>>> <groupId>
>>> org.apache.flink
>>> </groupId>
>>>
>>>>
>>>>
>>> <artifactId>
>>> flink-java
>>> </artifactId>
>>>
>>>>
>>>>
>>> <version>
>>> 1.3.0
>>> </version>
>>>
>>>>
>>>>
>>> </dependency>
>>>
>>>>
>>>>
>>> <dependency>
>>>
>>>>
>>>>
>>> <groupId>
>>> org.apache.flink
>>> </groupId>
>>>
>>>>
>>>>
>>> <artifactId>
>>> flink-streaming-java_2.11
>>> </artifactId>
>>>
>>>>
>>>>
>>> <version>
>>> 1.3.0
>>> </version>
>>>
>>>>
>>>>
>>> </dependency>
>>>
>>>>
>>>>
>>> <dependency>
>>>
>>>>
>>>>
>>> <groupId>
>>> org.apache.flink
>>> </groupId>
>>>
>>>>
>>>>
>>> <artifactId>
>>> flink-clients_2.11
>>> </artifactId>
>>>
>>>>
>>>>
>>> <version>
>>> 1.3.0
>>> </version>
>>>
>>>>
>>>>
>>> </dependency>
>>>
>>>>
>>>>
>>> <dependency>
>>>
>>>>
>>>>
>>> <groupId>
>>> org.apache.flink
>>> </groupId>
>>>
>>>>
>>>>
>>> <artifactId>
>>> flink-connector-kafka-0.9
>>> </artifactId>
>>>
>>>>
>>>> <version>
>>> 1.3.0
>>> </version>
>>>
>>>>
>>>>
>>> </dependency>
>>>
>>>>
>>>>
>>> <dependency>
>>>
>>>>
>>>>
>>> <groupId>
>>> org.apache.flink
>>> </groupId>
>>>
>>>>
>>>>
>>> <artifactId>
>>> flink-table_2.11
>>> </artifactId>
>>>
>>>>
>>>>
>>> <version>
>>> 1.3.0
>>> </version>
>>>
>>>>
>>>>
>>> </dependency>
>>>
>>>>
>>>>
>>> <dependency>
>>>
>>>>
>>>>
>>> <groupId>
>>> org.apache.flink
>>> </groupId>
>>>
>>>>
>>>>
>>> <artifactId>
>>> flink-core
>>> </artifactId>
>>>
>>>>
>>>>
>>> <version>
>>> 1.3.0
>>> </version>
>>>
>>>>
>>>>
>>> </dependency>
>>>
>>>>
>>>>
>>> <dependency>
>>>
>>>>
>>>>
>>> <groupId>
>>> org.apache.flink
>>> </groupId>
>>>
>>>>
>>>>
>>> <artifactId>
>>> flink-streaming-
>>>
>>>> scala_2.11
>>>>
>>> </artifactId>
>>>
>>>>
>>>>
>>> <version>
>>> 1.3.0
>>> </version>
>>>
>>>>
>>>>
>>> </dependency>
>>>
>>>>
>>>>
>>> </dependencies>
>>>
>>>>
>>>> Regards.
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>>>> n4.nabble.com/
>>>>
>>>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
>


-- 
"So you have to trust that the dots will somehow connect in your future."

Reply via email to