[ 
https://issues.apache.org/jira/browse/HIVE-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gopal V updated HIVE-20377:
---------------------------
    Description: 
h1. Goal
* Read streaming data form Kafka queue as an external table.
* Allow streaming navigation by pushing down filters on Kafka record partition 
id, offset and timestamp. 
* Insert streaming data form Kafka to an actual Hive internal table, using CTAS 
statement.
h1. Example
h2. Create the external table
{code} 
CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user` 
string, language string, added int, deleted int, flags string,comment string, 
namespace string)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES 
("kafka.topic" = "wikipedia", 
"kafka.bootstrap.servers"="brokeraddress:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
{code}
h2. Kafka Metadata
In order to keep track of Kafka records the storage handler will add 
automatically the Kafka row metadata eg partition id, record offset and record 
timestamp. 
{code}
DESCRIBE EXTENDED kafka_table

timestamp               timestamp               from deserializer   
page                    string                  from deserializer   
user                    string                  from deserializer   
language                string                  from deserializer   
country                 string                  from deserializer   
continent               string                  from deserializer   
namespace               string                  from deserializer   
newpage                 boolean                 from deserializer   
unpatrolled             boolean                 from deserializer   
anonymous               boolean                 from deserializer   
robot                   boolean                 from deserializer   
added                   int                     from deserializer   
deleted                 int                     from deserializer   
delta                   bigint                  from deserializer   
__partition             int                     from deserializer   
__offset                bigint                  from deserializer   
__timestamp             bigint                  from deserializer   

{code}

h2. Filter push down.
Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a 
given offset. The proposed storage handler will be able to leverage such API by 
pushing down filters over metadata columns, namely __partition (int), 
__offset(long) and __timestamp(long)
For instance Query like
{code} 
select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and 
`__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` > 
99) or (`__offset` = 109);
{code}
Will result on a scan of partition 0 only then read only records between offset 
4 and 109. 


  was:
h1. Goal
* Read streaming data form Kafka queue as an external table.
* Allow streaming navigation by pushing down filters on Kafka record partition 
id, offset and timestamp. 
* Insert streaming data form Kafka to an actual Hive internal table, using CTAS 
statement.
h1. Example
h2. Create the external table
{code} 
CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamps, page string, `user` 
string, language string, added int, deleted int, flags string,comment string, 
namespace string)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES 
("kafka.topic" = "wikipedia", 
"kafka.bootstrap.servers"="brokeraddress:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
{code}
h2. Kafka Metadata
In order to keep track of Kafka records the storage handler will add 
automatically the Kafka row metadata eg partition id, record offset and record 
timestamp. 
{code}
DESCRIBE EXTENDED kafka_table

timestamp               timestamp               from deserializer   
page                    string                  from deserializer   
user                    string                  from deserializer   
language                string                  from deserializer   
country                 string                  from deserializer   
continent               string                  from deserializer   
namespace               string                  from deserializer   
newpage                 boolean                 from deserializer   
unpatrolled             boolean                 from deserializer   
anonymous               boolean                 from deserializer   
robot                   boolean                 from deserializer   
added                   int                     from deserializer   
deleted                 int                     from deserializer   
delta                   bigint                  from deserializer   
__partition             int                     from deserializer   
__offset                bigint                  from deserializer   
__timestamp             bigint                  from deserializer   

{code}

h2. Filter push down.
Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a 
given offset. The proposed storage handler will be able to leverage such API by 
pushing down filters over metadata columns, namely __partition (int), 
__offset(long) and __timestamp(long)
For instance Query like
{code} 
select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and 
`__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` > 
99) or (`__offset` = 109);
{code}
Will result on a scan of partition 0 only then read only records between offset 
4 and 109. 



> Hive Kafka Storage Handler
> --------------------------
>
>                 Key: HIVE-20377
>                 URL: https://issues.apache.org/jira/browse/HIVE-20377
>             Project: Hive
>          Issue Type: Bug
>    Affects Versions: 4.0.0
>            Reporter: slim bouguerra
>            Assignee: slim bouguerra
>            Priority: Major
>
> h1. Goal
> * Read streaming data form Kafka queue as an external table.
> * Allow streaming navigation by pushing down filters on Kafka record 
> partition id, offset and timestamp. 
> * Insert streaming data form Kafka to an actual Hive internal table, using 
> CTAS statement.
> h1. Example
> h2. Create the external table
> {code} 
> CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user` 
> string, language string, added int, deleted int, flags string,comment string, 
> namespace string)
> STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> TBLPROPERTIES 
> ("kafka.topic" = "wikipedia", 
> "kafka.bootstrap.servers"="brokeraddress:9092",
> "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
> {code}
> h2. Kafka Metadata
> In order to keep track of Kafka records the storage handler will add 
> automatically the Kafka row metadata eg partition id, record offset and 
> record timestamp. 
> {code}
> DESCRIBE EXTENDED kafka_table
> timestamp                     timestamp               from deserializer   
> page                  string                  from deserializer   
> user                  string                  from deserializer   
> language              string                  from deserializer   
> country               string                  from deserializer   
> continent             string                  from deserializer   
> namespace             string                  from deserializer   
> newpage               boolean                 from deserializer   
> unpatrolled           boolean                 from deserializer   
> anonymous             boolean                 from deserializer   
> robot                 boolean                 from deserializer   
> added                 int                     from deserializer   
> deleted               int                     from deserializer   
> delta                 bigint                  from deserializer   
> __partition           int                     from deserializer   
> __offset              bigint                  from deserializer   
> __timestamp           bigint                  from deserializer   
> {code}
> h2. Filter push down.
> Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on 
> a given offset. The proposed storage handler will be able to leverage such 
> API by pushing down filters over metadata columns, namely __partition (int), 
> __offset(long) and __timestamp(long)
> For instance Query like
> {code} 
> select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 
> and `__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and 
> `__offset` > 99) or (`__offset` = 109);
> {code}
> Will result on a scan of partition 0 only then read only records between 
> offset 4 and 109. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to