Working out through individual messages in Flink

2018-07-29 Thread Mich Talebzadeh
Hi, I have a Kafka topic that transmits 100 security prices ever 2 seconds. In Spark streaming I go through the RDD and walk through rows one by one and check prices In prices are valuable I store them into an Hbase table val dstream = KafkaUtils.createDirectStream[String, String, StringDeco

Re: Working out through individual messages in Flink

2018-07-30 Thread Renjie Liu
Hi, Mich: You can add write a sink function for that. On Mon, Jul 30, 2018 at 2:58 PM Mich Talebzadeh wrote: > > Hi, > > I have a Kafka topic that transmits 100 security prices ever 2 seconds. > > In Spark streaming I go through the RDD and walk through rows one by one > and check prices > In pr

Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic UUID,Security, Time,Price 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,201

Re: Working out through individual messages in Flink

2018-07-30 Thread Fabian Hueske
Hi, Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each. Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapF

Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks Fabian. That was very useful. How about an operation like below? // create builder val KafkaTableSource = Kafka011JsonTableSource.builder() // set Kafka topic .forTopic(topicsValue) // set Kafka consumer properties .withKafkaPrope

Re: Working out through individual messages in Flink

2018-07-30 Thread Fabian Hueske
A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2]. You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query. However, there is no *Table*Sink for HBase and you

Re: Working out through individual messages in Flink

2018-07-30 Thread Mich Talebzadeh
Thanks again. The Hbase connector works fine in Flink // Start Hbase table stuff val tableName = "MARKETDATAHBASESPEEDFLINK" val hbaseConf = HBaseConfiguration.create() // Connecting to remote Hbase hbaseConf.set("hbase.master", hbaseHost) hbaseConf.set("hbase.zookeeper.quoru

Re: Working out through individual messages in Flink

2018-08-07 Thread Mich Talebzadeh
Hi Fabian, Reading your notes above I have converted the table back to DataStream. val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price) val key = tableEnv.scan("priceTable"

Re: Working out through individual messages in Flink

2018-08-07 Thread Jörn Franke
Hi Mich, Would it be possible to share the full source code ? I am missing a call to streamExecEnvironment.execute Best regards > On 8. Aug 2018, at 00:02, Mich Talebzadeh wrote: > > Hi Fabian, > > Reading your notes above I have converted the table back to DataStream. > > val tableEnv

Re: Working out through individual messages in Flink

2018-08-07 Thread Jörn Franke
(At the end of your code) > On 8. Aug 2018, at 00:29, Jörn Franke wrote: > > Hi Mich, > > Would it be possible to share the full source code ? > I am missing a call to streamExecEnvironment.execute > > Best regards > >> On 8. Aug 2018, at 00:02, Mich Talebzadeh wrote: >> >> Hi Fabian, >>

Re: Working out through individual messages in Flink

2018-08-07 Thread Mich Talebzadeh
Hi Jorn, Thanks I uploaded the Scala code to my GitHub --> md_streaming.scala https://github.com/michTalebzadeh/Flink Regards, Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw