Re: Ask about running Flink sql-client.sh
Thanks a lot. On Thu, May 2, 2019, 9:59 AM David Anderson wrote: > There are some step-by-step instructions for setting up the sql client in > https://training.ververica.com/setup/sqlClient.html, plus some examples. >
Re: Ask for SQL using kafka in Flink
Thanks Rong, I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema? Yes, there are two names but now I put one name only and I want to define jsonschema. Rong Rong wrote > Hi Radhya, > > Can you provide which Flink version you are using? Based on the latest > FLINK 1.5 release, Kafka09JsonTableSource takes: > > /** > * Creates a Kafka 0.9 JSON {@link StreamTableSource}. > * > * @param topic Kafka topic to consume. > * @param properties Properties for the Kafka consumer. > * @param tableSchema The schema of the table. > * @param jsonSchema The schema of the JSON messages to decode from > Kafka. > */ > > Also, your type definition: TypeInformation > > typeInfo2 = Types.ROW(... > arguments seem to have different length for schema names and types. > > Thanks, > Rong > > On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal > radhya.sahal@ > wrote: > >> Hi, >> >> Could anyone help me to solve this problem >> >> >> /Exception in thread "main" java.lang.Error: Unresolved compilation >> problem: >> The constructor Kafka09JsonTableSource(String, Properties, >> TypeInformation > > ) is undefined >> / >> *--This is the code * >> public class FlinkKafkaSQL { >> public static void main(String[] args) throws Exception { >> // Read parameters from command line >> final ParameterTool params = ParameterTool.fromArgs(args); >> >> if(params.getNumberOfParameters() < 5) { >> System.out.println("\nUsage: FlinkReadKafka " + >>"--read-topic > > " + >>"--write-topic > > " + >>"--bootstrap.servers > > " + >>"zookeeper.connect" + >>"--group.id > > "); >> return; >> } >> >> // setup streaming environment >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, >> 1)); >> env.enableCheckpointing(30); // 300 seconds >> env.getConfig().setGlobalJobParameters(params); >> >> StreamTableEnvironment tableEnv = >> TableEnvironment.getTableEnvironment(env); >> >> // specify JSON field names and types >> >> >> TypeInformation > > typeInfo2 = Types.ROW( >> new String[] { "iotdevice", "sensorID" }, >> new TypeInformation[] { Types.STRING()} >> ); >> >> // create a new tablesource of JSON from kafka >> KafkaJsonTableSource kafkaTableSource = new >> Kafka09JsonTableSource( >> params.getRequired("read-topic"), >> params.getProperties(), >> typeInfo2); >> >> // run some SQL to filter results where a key is not null >> String sql = "SELECT sensorID " + >> "FROM iotdevice "; >> tableEnv.registerTableSource("iotdevice", kafkaTableSource); >> Table result = tableEnv.sql(sql); >> >> // create a partition for the data going into kafka >> FlinkFixedPartitioner partition = new >> FlinkFixedPartitioner(); >> >> // create new tablesink of JSON to kafka >> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( >> params.getRequired("write-topic"), >> params.getProperties(), >> partition); >> >> result.writeToSink(kafkaTableSink); >> >> env.execute("FlinkReadWriteKafkaJSON"); >> } >> } >> >> >> *This is the dependencies in pom.xml* >> >> > >> > >> > > org.apache.flink > >> > > flink-java > >> > > 1.3.0 > >> > >> > >> > > org.apache.flink > >>
Re: Ask for SQL using kafka in Flink
Thanks Rong, I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema? Yes, there are two names but now I put one name only and I want to define jsonschema. Rong Rong wrote > Hi Radhya, > > Can you provide which Flink version you are using? Based on the latest > FLINK 1.5 release, Kafka09JsonTableSource takes: > > /** > * Creates a Kafka 0.9 JSON {@link StreamTableSource}. > * > * @param topic Kafka topic to consume. > * @param properties Properties for the Kafka consumer. > * @param tableSchema The schema of the table. > * @param jsonSchema The schema of the JSON messages to decode from > Kafka. > */ > > Also, your type definition: TypeInformation > > typeInfo2 = Types.ROW(... > arguments seem to have different length for schema names and types. > > Thanks, > Rong > > On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal > radhya.sahal@ > wrote: > >> Hi, >> >> Could anyone help me to solve this problem >> >> >> /Exception in thread "main" java.lang.Error: Unresolved compilation >> problem: >> The constructor Kafka09JsonTableSource(String, Properties, >> TypeInformation > > ) is undefined >> / >> *--This is the code * >> public class FlinkKafkaSQL { >> public static void main(String[] args) throws Exception { >> // Read parameters from command line >> final ParameterTool params = ParameterTool.fromArgs(args); >> >> if(params.getNumberOfParameters() < 5) { >> System.out.println("\nUsage: FlinkReadKafka " + >>"--read-topic > > " + >>"--write-topic > > " + >>"--bootstrap.servers > > " + >>"zookeeper.connect" + >>"--group.id > > "); >> return; >> } >> >> // setup streaming environment >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, >> 1)); >> env.enableCheckpointing(30); // 300 seconds >> env.getConfig().setGlobalJobParameters(params); >> >> StreamTableEnvironment tableEnv = >> TableEnvironment.getTableEnvironment(env); >> >> // specify JSON field names and types >> >> >> TypeInformation > > typeInfo2 = Types.ROW( >> new String[] { "iotdevice", "sensorID" }, >> new TypeInformation[] { Types.STRING()} >> ); >> >> // create a new tablesource of JSON from kafka >> KafkaJsonTableSource kafkaTableSource = new >> Kafka09JsonTableSource( >> params.getRequired("read-topic"), >> params.getProperties(), >> typeInfo2); >> >> // run some SQL to filter results where a key is not null >> String sql = "SELECT sensorID " + >> "FROM iotdevice "; >> tableEnv.registerTableSource("iotdevice", kafkaTableSource); >> Table result = tableEnv.sql(sql); >> >> // create a partition for the data going into kafka >> FlinkFixedPartitioner partition = new >> FlinkFixedPartitioner(); >> >> // create new tablesink of JSON to kafka >> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( >> params.getRequired("write-topic"), >> params.getProperties(), >> partition); >> >> result.writeToSink(kafkaTableSink); >> >> env.execute("FlinkReadWriteKafkaJSON"); >> } >> } >> >> >> *This is the dependencies in pom.xml* >> >> > >> > >> > > org.apache.flink > >> > > flink-java > >> > > 1.3.0 > >> > >> > >> > > org.apache.flink > >>
Ask for SQL using kafka in Flink
Hi, Could anyone help me to solve this problem /Exception in thread "main" java.lang.Error: Unresolved compilation problem: The constructor Kafka09JsonTableSource(String, Properties, TypeInformation) is undefined / *--This is the code * public class FlinkKafkaSQL { public static void main(String[] args) throws Exception { // Read parameters from command line final ParameterTool params = ParameterTool.fromArgs(args); if(params.getNumberOfParameters() < 5) { System.out.println("\nUsage: FlinkReadKafka " + "--read-topic " + "--write-topic " + "--bootstrap.servers " + "zookeeper.connect" + "--group.id "); return; } // setup streaming environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(30); // 300 seconds env.getConfig().setGlobalJobParameters(params); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // specify JSON field names and types TypeInformation typeInfo2 = Types.ROW( new String[] { "iotdevice", "sensorID" }, new TypeInformation[] { Types.STRING()} ); // create a new tablesource of JSON from kafka KafkaJsonTableSource kafkaTableSource = new Kafka09JsonTableSource( params.getRequired("read-topic"), params.getProperties(), typeInfo2); // run some SQL to filter results where a key is not null String sql = "SELECT sensorID " + "FROM iotdevice "; tableEnv.registerTableSource("iotdevice", kafkaTableSource); Table result = tableEnv.sql(sql); // create a partition for the data going into kafka FlinkFixedPartitioner partition = new FlinkFixedPartitioner(); // create new tablesink of JSON to kafka KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( params.getRequired("write-topic"), params.getProperties(), partition); result.writeToSink(kafkaTableSink); env.execute("FlinkReadWriteKafkaJSON"); } } *This is the dependencies in pom.xml* org.apache.flink flink-java 1.3.0 org.apache.flink flink-streaming-java_2.11 1.3.0 org.apache.flink flink-clients_2.11 1.3.0 org.apache.flink flink-connector-kafka-0.9 1.3.0 org.apache.flink flink-table_2.11 1.3.0 org.apache.flink flink-core 1.3.0 org.apache.flink flink-streaming-scala_2.11 1.3.0 Regards. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Ask about convert KafkaTableSource to table
Hi, Could you help me which is the integrated versions of Java/Kafka/ Flink which can work without errors. I have a problem when I read streamed data from KafkaTableSource to table. I use this command tableEnv.registerDataStream("sensors", myKafkaTableSource) But, there it doesn't run (i.e., compilation error). Since, these are the dependencies in pom.xml org.apache.flink flink-streaming-java_2.11 1.4.0 org.apache.flink flink-clients_2.11 1.4.0 org.apache.flink flink-connector-kafka-0.11_2.11 1.4.0 org.apache.flink flink-table_2.11 1.4.0 org.apache.flink flink-core 1.4.0 org.apache.flink flink-streaming-scala_2.11 1.4.0 Regards. Radhya Sahal -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/