A *Table*Source [1], is a special input connector for Flink's relational
APIs (Table API and SQL) [2].
You can transform and filter with these APIs as well (it's probably even
easier). In SQL this would be the SELECT and WHERE clauses of a query.

However, there is no *Table*Sink for HBase and you would need to convert
the Table back to a DataStream [3].
That's not very difficult since the APIs are integrated with each other.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset

2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:

> Thanks Fabian. That was very useful.
>
> How about an operation like below?
>
>          // create builder
>          val KafkaTableSource = Kafka011JsonTableSource.builder()
>          // set Kafka topic
>             .forTopic(topicsValue)
>          // set Kafka consumer properties
>             .withKafkaProperties(properties)
>          // set Table schema
>         .withSchema(TableSchema.builder()
>         .field("key", Types.STRING)
>         .field("ticker", Types.STRING)
>         .field("timeissued", Types.STRING)
>         .field("price", Types.FLOAT)
>         .build())
>
> Will that be OK?
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink processes streams record by record, instead of micro-batching
>> records together. Since every record comes by itself, there is no for-each.
>> Simple record-by-record transformations can be done with a MapFunction,
>> filtering out records with a FilterFunction. You can also implement a
>> FlatMapFunction to do both in one step.
>>
>> Once the stream is transformed and filtered, you can write it to HBase
>> with a sink function.
>>
>>
>> 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <mich.talebza...@gmail.com>:
>>
>>> Just to clarify these are the individual prices separated by ','. The
>>> below shows three price lines in the topic
>>>
>>> UUID,                            Security,         Time,        Price
>>> 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
>>> 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
>>> 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <mich.talebza...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>> I have a Kafka topic that transmits 100 security prices ever 2 seconds.
>>>>
>>>> In Spark streaming I go through the RDD and walk through rows one by
>>>> one and check prices
>>>> In prices are valuable I store them into an Hbase table
>>>>
>>>>     val dstream = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>>>     dstream.cache()
>>>>     dstream.foreachRDD
>>>>     { pricesRDD =>
>>>>           // Work on individual messages
>>>>       *   for(line <- pricesRDD.collect.toArray)*
>>>>          {
>>>>            var key = line._2.split(',').view(0).toString
>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>            var price = line._2.split(',').view(3).toFloat
>>>>            val priceToString = line._2.split(',').view(3)
>>>>             if (price > 90.0)
>>>>            {
>>>>                //save to Hbase table
>>>>            }
>>>>           }
>>>>      }
>>>>
>>>> That works fine.
>>>>
>>>> In Flink I define my source as below
>>>>
>>>>     val streamExecEnv = StreamExecutionEnvironment.
>>>> getExecutionEnvironment
>>>>     streamExecEnv.setStreamTimeCharacteristic(
>>>> TimeCharacteristic.EventTime)
>>>>     val stream = streamExecEnv
>>>>        .addSource(new FlinkKafkaConsumer011[String](topicsValue, new
>>>> SimpleStringSchema(), properties))
>>>>
>>>> Is there anyway I can perform similar operation in Flink? I need to go
>>>> through every topic load sent and look at the individual rows/ For example
>>>> what is the equivalent of
>>>>
>>>> *for(line <- pricesRDD.collect.toArray)*
>>>> In flink?
>>>>
>>>> Thanks
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>
>>

Reply via email to