[ https://issues.apache.org/jira/browse/HIVE-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
slim bouguerra updated HIVE-20377: ---------------------------------- Attachment: HIVE-20377.19.patch > Hive Kafka Storage Handler > -------------------------- > > Key: HIVE-20377 > URL: https://issues.apache.org/jira/browse/HIVE-20377 > Project: Hive > Issue Type: New Feature > Affects Versions: 4.0.0 > Reporter: slim bouguerra > Assignee: slim bouguerra > Priority: Major > Attachments: HIVE-20377.10.patch, HIVE-20377.11.patch, > HIVE-20377.12.patch, HIVE-20377.15.patch, HIVE-20377.18.patch, > HIVE-20377.18.patch, HIVE-20377.19.patch, HIVE-20377.19.patch, > HIVE-20377.4.patch, HIVE-20377.5.patch, HIVE-20377.6.patch, > HIVE-20377.8.patch, HIVE-20377.8.patch, HIVE-20377.patch > > > h1. Goal > * Read streaming data form Kafka queue as an external table. > * Allow streaming navigation by pushing down filters on Kafka record > partition id, offset and timestamp. > * Insert streaming data form Kafka to an actual Hive internal table, using > CTAS statement. > h1. Example > h2. Create the external table > {code} > CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user` > string, language string, added int, deleted int, flags string,comment string, > namespace string) > STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' > TBLPROPERTIES > ("kafka.topic" = "wikipedia", > "kafka.bootstrap.servers"="brokeraddress:9092", > "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe"); > {code} > h2. Kafka Metadata > In order to keep track of Kafka records the storage handler will add > automatically the Kafka row metadata eg partition id, record offset and > record timestamp. > {code} > DESCRIBE EXTENDED kafka_table > timestamp timestamp from deserializer > page string from deserializer > user string from deserializer > language string from deserializer > country string from deserializer > continent string from deserializer > namespace string from deserializer > newpage boolean from deserializer > unpatrolled boolean from deserializer > anonymous boolean from deserializer > robot boolean from deserializer > added int from deserializer > deleted int from deserializer > delta bigint from deserializer > __partition int from deserializer > __offset bigint from deserializer > __timestamp bigint from deserializer > {code} > h2. Filter push down. > Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on > a given offset. The proposed storage handler will be able to leverage such > API by pushing down filters over metadata columns, namely __partition (int), > __offset(long) and __timestamp(long) > For instance Query like > {code} > select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 > and `__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and > `__offset` > 99) or (`__offset` = 109); > {code} > Will result on a scan of partition 0 only then read only records between > offset 4 and 109. > h2. With timestamp seeks > The seeking based on the internal timestamps allows the handler to run on > recently arrived data, by doing > {code} > select count(*) from kafka_table where `__timestamp` > 1000 * > to_unix_timestamp(CURRENT_TIMESTAMP - interval '20' hours) ; > {code} > This allows for implicit relationships between event timestamps and kafka > timestamps to be expressed in queries (i.e event_timestamp is always < than > kafka __timestamp and kafka __timestamp is never > 15 minutes from event etc). > h2. More examples with Avro > {code} > CREATE EXTERNAL TABLE wiki_kafka_avro_table > STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' > TBLPROPERTIES > ("kafka.topic" = "wiki_kafka_avro_table", > "kafka.bootstrap.servers"="localhost:9092", > "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe", > 'avro.schema.literal'='{ > "type" : "record", > "name" : "Wikipedia", > "namespace" : "org.apache.hive.kafka", > "version": "1", > "fields" : [ { > "name" : "isrobot", > "type" : "boolean" > }, { > "name" : "channel", > "type" : "string" > }, { > "name" : "timestamp", > "type" : "string" > }, { > "name" : "flags", > "type" : "string" > }, { > "name" : "isunpatrolled", > "type" : "boolean" > }, { > "name" : "page", > "type" : "string" > }, { > "name" : "diffurl", > "type" : "string" > }, { > "name" : "added", > "type" : "long" > }, { > "name" : "comment", > "type" : "string" > }, { > "name" : "commentlength", > "type" : "long" > }, { > "name" : "isnew", > "type" : "boolean" > }, { > "name" : "isminor", > "type" : "boolean" > }, { > "name" : "delta", > "type" : "long" > }, { > "name" : "isanonymous", > "type" : "boolean" > }, { > "name" : "user", > "type" : "string" > }, { > "name" : "deltabucket", > "type" : "double" > }, { > "name" : "deleted", > "type" : "long" > }, { > "name" : "namespace", > "type" : "string" > } ] > }' > ); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)