[ 
https://issues.apache.org/jira/browse/HIVE-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

slim bouguerra updated HIVE-20377:
----------------------------------
    Attachment: HIVE-20377.19.patch

> Hive Kafka Storage Handler
> --------------------------
>
>                 Key: HIVE-20377
>                 URL: https://issues.apache.org/jira/browse/HIVE-20377
>             Project: Hive
>          Issue Type: New Feature
>    Affects Versions: 4.0.0
>            Reporter: slim bouguerra
>            Assignee: slim bouguerra
>            Priority: Major
>         Attachments: HIVE-20377.10.patch, HIVE-20377.11.patch, 
> HIVE-20377.12.patch, HIVE-20377.15.patch, HIVE-20377.18.patch, 
> HIVE-20377.18.patch, HIVE-20377.19.patch, HIVE-20377.19.patch, 
> HIVE-20377.19.patch, HIVE-20377.4.patch, HIVE-20377.5.patch, 
> HIVE-20377.6.patch, HIVE-20377.8.patch, HIVE-20377.8.patch, HIVE-20377.patch
>
>
> h1. Goal
> * Read streaming data form Kafka queue as an external table.
> * Allow streaming navigation by pushing down filters on Kafka record 
> partition id, offset and timestamp. 
> * Insert streaming data form Kafka to an actual Hive internal table, using 
> CTAS statement.
> h1. Example
> h2. Create the external table
> {code} 
> CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user` 
> string, language string, added int, deleted int, flags string,comment string, 
> namespace string)
> STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> TBLPROPERTIES 
> ("kafka.topic" = "wikipedia", 
> "kafka.bootstrap.servers"="brokeraddress:9092",
> "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
> {code}
> h2. Kafka Metadata
> In order to keep track of Kafka records the storage handler will add 
> automatically the Kafka row metadata eg partition id, record offset and 
> record timestamp. 
> {code}
> DESCRIBE EXTENDED kafka_table
> timestamp                     timestamp               from deserializer   
> page                  string                  from deserializer   
> user                  string                  from deserializer   
> language              string                  from deserializer   
> country               string                  from deserializer   
> continent             string                  from deserializer   
> namespace             string                  from deserializer   
> newpage               boolean                 from deserializer   
> unpatrolled           boolean                 from deserializer   
> anonymous             boolean                 from deserializer   
> robot                 boolean                 from deserializer   
> added                 int                     from deserializer   
> deleted               int                     from deserializer   
> delta                 bigint                  from deserializer   
> __partition           int                     from deserializer   
> __offset              bigint                  from deserializer   
> __timestamp           bigint                  from deserializer   
> {code}
> h2. Filter push down.
> Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on 
> a given offset. The proposed storage handler will be able to leverage such 
> API by pushing down filters over metadata columns, namely __partition (int), 
> __offset(long) and __timestamp(long)
> For instance Query like
> {code} 
> select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 
> and `__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and 
> `__offset` > 99) or (`__offset` = 109);
> {code}
> Will result on a scan of partition 0 only then read only records between 
> offset 4 and 109. 
> h2. With timestamp seeks 
> The seeking based on the internal timestamps allows the handler to run on 
> recently arrived data, by doing
> {code}
> select count(*) from kafka_table where `__timestamp` >  1000 * 
> to_unix_timestamp(CURRENT_TIMESTAMP - interval '20' hours) ;
> {code}
> This allows for implicit relationships between event timestamps and kafka 
> timestamps to be expressed in queries (i.e event_timestamp is always < than 
> kafka __timestamp and kafka __timestamp is never > 15 minutes from event etc).
> h2. More examples with Avro 
> {code}
> CREATE EXTERNAL TABLE wiki_kafka_avro_table
> STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> TBLPROPERTIES
> ("kafka.topic" = "wiki_kafka_avro_table",
> "kafka.bootstrap.servers"="localhost:9092",
> "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe",
> 'avro.schema.literal'='{
>   "type" : "record",
>   "name" : "Wikipedia",
>   "namespace" : "org.apache.hive.kafka",
>   "version": "1",
>   "fields" : [ {
>     "name" : "isrobot",
>     "type" : "boolean"
>   }, {
>     "name" : "channel",
>     "type" : "string"
>   }, {
>     "name" : "timestamp",
>     "type" : "string"
>   }, {
>     "name" : "flags",
>     "type" : "string"
>   }, {
>     "name" : "isunpatrolled",
>     "type" : "boolean"
>   }, {
>     "name" : "page",
>     "type" : "string"
>   }, {
>     "name" : "diffurl",
>     "type" : "string"
>   }, {
>     "name" : "added",
>     "type" : "long"
>   }, {
>     "name" : "comment",
>     "type" : "string"
>   }, {
>     "name" : "commentlength",
>     "type" : "long"
>   }, {
>     "name" : "isnew",
>     "type" : "boolean"
>   }, {
>     "name" : "isminor",
>     "type" : "boolean"
>   }, {
>     "name" : "delta",
>     "type" : "long"
>   }, {
>     "name" : "isanonymous",
>     "type" : "boolean"
>   }, {
>     "name" : "user",
>     "type" : "string"
>   }, {
>     "name" : "deltabucket",
>     "type" : "double"
>   }, {
>     "name" : "deleted",
>     "type" : "long"
>   }, {
>     "name" : "namespace",
>     "type" : "string"
>   } ]
> }'
> );
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to