This is an automated email from the ASF dual-hosted git repository. gates pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new b6f371a HIVE-21869 Clean up the Kafka storage handler readme and examples (Kristopher Kane via Alan Gates) b6f371a is described below commit b6f371ad95e654f47f2a55233af2959182379eb9 Author: Alan Gates <ga...@hortonworks.com> AuthorDate: Tue Jun 25 14:12:27 2019 -0700 HIVE-21869 Clean up the Kafka storage handler readme and examples (Kristopher Kane via Alan Gates) --- kafka-handler/README.md | 437 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 328 insertions(+), 109 deletions(-) diff --git a/kafka-handler/README.md b/kafka-handler/README.md index c986d85..753e3e3 100644 --- a/kafka-handler/README.md +++ b/kafka-handler/README.md @@ -1,33 +1,59 @@ # Kafka Storage Handler Module -Storage Handler that allows user to Connect/Analyse/Transform Kafka topics. -The workflow is as follow, first the user will create an external table that is a view over one Kafka topic, -then the user will be able to run any SQL query including write back to the same table or different kafka backed table. +Storage Handler that allows users to connect/analyze/transform Kafka topics. +The workflow is as follows: +- First, the user will create an external table that is a view over one Kafka topic +- Second, the user will be able to run any SQL query including write back to the same table or different Kafka backed table + +## Kafka Management + +Kafka Java client version: 2.x + +This handler does not commit offsets of topic partition reads either using the intrinsic Kafka capability or in an external +storage. This means a query over a Kafka topic backed table will be a full topic read unless partitions are filtered +manually, via SQL, by the methods described below. In the ETL section, a method for storing topic offsets in Hive tables +is provided for tracking consumer position but this is not a part of the handler itself. ## Usage ### Create Table -Use following statement to create table: +Use the following statement to create a table: + ```sql -CREATE EXTERNAL TABLE kafka_table -(`timestamp` timestamp , `page` string, `newPage` boolean, - added int, deleted bigint, delta double) -STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' -TBLPROPERTIES -("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092"); +CREATE EXTERNAL TABLE + kafka_table ( + `timestamp` TIMESTAMP, + `page` STRING, + `newPage` BOOLEAN, + `added` INT, + `deleted` BIGINT, + `delta` DOUBLE) +STORED BY + 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES ( + "kafka.topic" = "test-topic", + "kafka.bootstrap.servers" = "localhost:9092"); ``` -Table property `kafka.topic` is the Kafka Topic to connect to and `kafka.bootstrap.servers` is the Broker connection string. + +The table property `kafka.topic` is the Kafka topic to connect to and `kafka.bootstrap.servers` is the Kafka broker connection string. Both properties are mandatory. -On the write path if such a topic does not exists the topic will be created if Kafka broker admin policy allow such operation. +On the write path if such a topic does not exist the topic will be created if Kafka broker admin policy allows for +auto topic creation. + +By default the serializer and deserializer is JSON, specifically `org.apache.hadoop.hive.serde2.JsonSerDe`. + +If you want to change the serializer/deserializer classes you can update the TBLPROPERTIES with SQL syntax `ALTER TABLE`. -By default the serializer and deserializer is Json `org.apache.hadoop.hive.serde2.JsonSerDe`. -If you want to switch serializer/deserializer classes you can use alter table. ```sql -ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe"); -``` -List of supported Serializer Deserializer: +ALTER TABLE + kafka_table +SET TBLPROPERTIES ( + "kafka.serde.class" = "org.apache.hadoop.hive.serde2.avro.AvroSerDe"); +``` + +List of supported serializers and deserializers: -|Supported Serializer Deserializer| +|Supported Serializers and Deserializers| |-----| |org.apache.hadoop.hive.serde2.JsonSerDe| |org.apache.hadoop.hive.serde2.OpenCSVSerde| @@ -35,8 +61,9 @@ List of supported Serializer Deserializer: |org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe| |org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe| -#### Table definition -In addition to the user defined payload schema Kafka Storage Handler will append additional columns allowing user to query the Kafka metadata fields: +#### Table Definitions +In addition to the user defined column schema, this handler will append additional columns allowing +the user to query the Kafka metadata fields: - `__key` Kafka record key (byte array) - `__partition` Kafka record partition identifier (int 32) - `__offset` Kafka record offset (int 64) @@ -47,71 +74,139 @@ In addition to the user defined payload schema Kafka Storage Handler will append List the table properties and all the partition/offsets information for the topic. ```sql -Describe extended kafka_table; +DESCRIBE EXTENDED + kafka_table; ``` -Count the number of records with Kafka record timestamp within the last 10 minutes interval. +Count the number of records where the record timestamp is within the last 10 minutes of query execution time. ```sql -SELECT count(*) from kafka_table -where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES); +SELECT + COUNT(*) +FROM + kafka_table +WHERE + `__timestamp` > 1000 * TO_UNIX_TIMESTAMP(CURRENT_TIMESTAMP - INTERVAL '10' MINUTES); ``` -The storage handler allow filter push-down read optimization, -for instance the query above will only read the records with timestamp satisfying the filter predicate. -Please note that such time based seek is only viable if the Kafka broker allow time based lookup (Kafka 0.11 or later versions) -In addition to **time based seek**, the storage handler reader is able to seek to a particular partition offset using the SQL WHERE clause. -Currently only support OR/AND with (<, <=, >=, >) +The storage handler allows these metadata fields to filter push-down read optimizations to Kafka. +For instance, the query above will only read the records with timestamp satisfying the filter predicate. +Please note that such time based filtering (Kafka consumer partition seek) is only viable if the Kafka broker +version allows time based look up (Kafka 0.11 or later versions) + +In addition to **time based filtering**, the storage handler reader is able to filter based on a +particular partition offset using the SQL WHERE clause. +Currently supports operators `OR` and `AND` with comparison operators `<`, `<=`, `>=`, `>`. + +#### Metadata Query Examples ```sql -SELECT count(*) from kafka_table -where (`__offset` < 10 and `__offset`>3 and `__partition` = 0) -or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99) -or (`__offset` = 109); +SELECT + COUNT(*) +FROM + kafka_table +WHERE + (`__offset` < 10 AND `__offset` >3 AND `__partition` = 0) + OR + (`__partition` = 0 AND `__offset` < 105 AND `__offset` > 99) + OR (`__offset` = 109); ``` -User can define a view to take of the last 15 minutes and mask what ever column as follow: +Keep in mind that partitions can grow and shrink within the Kafka cluster without the consumer's knowledge. This +partition and offset capability is good for replay of specific partitions when the consumer knows that something has +gone wrong down stream or replay is required. Apache Hive users may or may not understand the underlying architecture +of Kafka therefore, filtering on the record timestamp metadata column is arguably the best filter to use since it +requires no partition knowledge. + +The user can define a view to take of the last 15 minutes and mask what ever column as follows: ```sql -CREATE VIEW last_15_minutes_of_kafka_table as select `timestamp`, `user`, delta, added from kafka_table -where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES); +CREATE VIEW + last_15_minutes_of_kafka_table +AS +SELECT + `timestamp`, + `user`, + `delta`, + `added` +FROM + kafka_table +WHERE + `__timestamp` > 1000 * TO_UNIX_TIMESTAMP(CURRENT_TIMESTAMP - INTERVAL '15' MINUTES); ``` -Join the Kafka Stream to Hive table. For instance assume you want to join the last 15 minutes of stream to dimension table like the following. +Join the Kafka topic to Hive table. For instance, assume you want to join the last 15 minutes of the topic to +a dimension table with the following example: + ```sql -CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ; +CREATE TABLE + user_table ( + `user` STRING, + `first_name` STRING, + `age` INT, + `gender` STRING, + `comments` STRING ) +STORED AS ORC; ``` -Join the view of the last 15 minutes to `user_table`, group by user gender column and compute aggregates -over metrics from fact table and dimension table. +Join the view of the last 15 minutes to `user_table`, group by the `gender` column and compute aggregates +over metrics from the fact and dimension tables. ```sql -SELECT sum(added) as added, sum(deleted) as deleted, avg(delta) as delta, avg(age) as avg_age , gender -FROM last_15_minutes_of_kafka_table join user_table on `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user` -GROUP BY gender limit 10; +SELECT + SUM(`added`) AS `added`, + SUM(`deleted`) AS `deleted`, + AVG(`delta`) AS `delta`, + AVG(`age`) AS `avg_age`, + `gender` +FROM + last_15_minutes_of_kafka_table +JOIN + user_table ON + last_15_minutes_of_kafka_table.`user` = user_table.`user` +GROUP BY + `gender` +LIMIT 10; ``` +In cases where you want to perform some ad-hoc analysis over the last 15 minutes of topic data, +you can join it on itself. In the following example, we show how you can perform classical +user retention analysis over the Kafka topic. -Join the Stream to the Stream it self. In cases where you want to perform some Ad-Hoc query over the last 15 minutes view. -In the following example we show how you can perform classical user retention analysis over the Kafka Stream. ```sql --- Steam join over the view it self +-- Topic join over the view itself -- The example is adapted from https://www.periscopedata.com/blog/how-to-calculate-cohort-retention-in-sql --- Assuming l15min_wiki is a view of the last 15 minutes -select count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users -from l15min_wiki as activity -left join l15min_wiki as future_activity on +-- Assuming l15min_wiki is a view of the last 15 minutes based on the topic's timestamp record metadata + +SELECT + COUNT(DISTINCT `activity`.`user`) AS `active_users`, + COUNT(DISTINCT `future_activity`.`user`) AS `retained_users` +FROM + l15min_wiki AS activity +LEFT JOIN + l15min_wiki AS future_activity +ON activity.`user` = future_activity.`user` - and activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ; - --- Stream to stream join --- Assuming wiki_kafka_hive is the entire stream. -select floor_hour(activity.`timestamp`), count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users -from wiki_kafka_hive as activity -left join wiki_kafka_hive as future_activity on +AND + activity.`timestamp` = future_activity.`timestamp` - INTERVAL '5' MINUTES; + +-- Topic to topic join +-- Assuming wiki_kafka_hive is the entire topic + +SELECT + FLOOR_HOUR(activity.`timestamp`), + COUNT(DISTINCT activity.`user`) AS `active_users`, + COUNT(DISTINCT future_activity.`user`) AS retained_users +FROM + wiki_kafka_hive AS activity +LEFT JOIN + wiki_kafka_hive AS future_activity +ON activity.`user` = future_activity.`user` - and activity.`timestamp` = future_activity.`timestamp` - interval '1' hour group by floor_hour(activity.`timestamp`); - +AND + activity.`timestamp` = future_activity.`timestamp` - INTERVAL '1' HOUR +GROUP BY + FLOOR_HOUR(activity.`timestamp`); ``` # Configuration @@ -130,88 +225,212 @@ left join wiki_kafka_hive as future_activity on ### Setting Extra Consumer/Producer properties. -User can inject custom Kafka consumer/producer properties via the Table properties. -To do so user can add any key/value pair of Kafka config to the Hive table property +The user can inject custom Kafka consumer/producer properties via the table properties. +To do so, the user can add any key/value pair of Kafka config to the Hive table property by prefixing the key with `kafka.consumer` for consumer configs and `kafka.producer` for producer configs. For instance the following alter table query adds the table property `"kafka.consumer.max.poll.records" = "5000"` and will inject `max.poll.records=5000` to the Kafka Consumer. + ```sql -ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.consumer.max.poll.records"="5000"); +ALTER TABLE + kafka_table +SET TBLPROPERTIES + ("kafka.consumer.max.poll.records" = "5000"); ``` -# Kafka to Hive ETL PIPE LINE +# Kafka to Hive ETL Pipeline Example + +In this example we will load topic data only once. The goal is to read data and commit both data and +offsets in a single Transaction -load form Kafka every Record exactly once -Goal is to read data and commit both data and its offsets in a single Transaction +First, create the offset table. -First create the offset table. ```sql -Drop table kafka_table_offsets; -create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp); +DROP TABLE + kafka_table_offsets; + +CREATE TABLE + kafka_table_offsets ( + `partition_id` INT, + `max_offset` BIGINT, + `insert_time` TIMESTAMP); ``` Initialize the table + ```sql -insert overwrite table kafka_table_offsets select `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP -from wiki_kafka_hive group by `__partition`, CURRENT_TIMESTAMP ; +INSERT OVERWRITE TABLE + kafka_table_offsets +SELECT + `__partition`, + MIN(`__offset`) - 1, + CURRENT_TIMESTAMP +FROM + wiki_kafka_hive +GROUP BY + `__partition`, + CURRENT_TIMESTAMP; ``` -Create the end target table on the Hive warehouse. + +Create the final Hive table for warehouse use, + ```sql -Drop table orc_kafka_table; -Create table orc_kafka_table (partition_id int, koffset bigint, ktimestamp bigint, - `timestamp` timestamp , `page` string, `user` string, `diffurl` string, - `isrobot` boolean, added int, deleted int, delta bigint -) stored as ORC; +DROP TABLE + orc_kafka_table; + +CREATE TABLE + orc_kafka_table ( + `partition_id` INT, + `koffset` BIGINT, + `ktimestamp` BIGINT, + `timestamp` TIMESTAMP, + `page` STRING, + `user` STRING, + `diffurl` STRING, + `isrobot` BOOLEAN, + `added` INT, + `deleted` INT, + `delta` BIGINT) +STORED AS ORC; ``` -This an example tp insert up to offset = 2 only + +This is an example that inserts up to offset = 2 only. ```sql -From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table -on (ktable.`__partition` = offset_table.partition_id -and ktable.`__offset` > offset_table.max_offset and ktable.`__offset` < 3 ) -insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, -`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta -Insert overwrite table kafka_table_offsets select -`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP; +FROM + wiki_kafka_hive ktable +JOIN + kafka_table_offsets offset_table +ON ( + ktable.`__partition` = offset_table.`partition_id` + AND + ktable.`__offset` > offset_table.`max_offset` + AND + ktable.`__offset` < 3 ) + +INSERT INTO TABLE + orc_kafka_table +SELECT + `__partition`, + `__offset`, + `__timestamp`, + `timestamp`, + `page`, + `user`, + `diffurl`, + `isrobot`, + `added`, + `deleted`, + `delta` + +INSERT OVERWRITE TABLE + kafka_table_offsets +SELECT + `__partition`, + max(`__offset`), + CURRENT_TIMESTAMP +GROUP BY + `__partition`, + CURRENT_TIMESTAMP; ``` -Double check the insert +Double check the insert. + ```sql -select max(`koffset`) from orc_kafka_table limit 10; -select count(*) as c from orc_kafka_table group by partition_id, koffset having c > 1; +SELECT + max(`koffset`) +FROM + orc_kafka_table +LIMIT 10; + +SELECT + COUNT(*) AS `c` +FROM + orc_kafka_table +GROUP BY + `partition_id`, + `koffset` +HAVING + `c` > 1; ``` -Repeat this periodically to insert all data. +Conduct this as data becomes available on the topic. ```sql -From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table -on (ktable.`__partition` = offset_table.partition_id -and ktable.`__offset` > offset_table.max_offset ) -insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`, -`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta -Insert overwrite table kafka_table_offsets select -`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP; +FROM + wiki_kafka_hive ktable +JOIN + kafka_table_offsets offset_table +ON ( + ktable.`__partition` = offset_table.`partition_id` + AND + ktable.`__offset` > `offset_table.max_offset`) + +INSERT INTO TABLE + orc_kafka_table +SELECT + `__partition`, + `__offset`, + `__timestamp`, + `timestamp`, + `page`, + `user`, + `diffurl`, + `isrobot`, + `added`, + `deleted`, + `delta` + +INSERT OVERWRITE TABLE + kafka_table_offsets +SELECT + `__partition`, + max(`__offset`), + CURRENT_TIMESTAMP +GROUP BY + `__partition`, + CURRENT_TIMESTAMP; ``` # ETL from Hive to Kafka -## INSERT INTO -First create the table in have that will be the target table. Now all the inserts will go to the topic mapped by this Table. +## Kafka topic append with INSERT +First create the table in Hive that will be the target table. Now all the inserts will go to the topic mapped by +this table. Be aware that the Avro SerDe used below is regular Apache Avro (with schema) and not Confluent serialized +Avro which is popular with Kafka usage ```sql -CREATE EXTERNAL TABLE moving_avg_wiki_kafka_hive -(`channel` string, `namespace` string,`page` string, `timestamp` timestamp , avg_delta double ) -STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' -TBLPROPERTIES -("kafka.topic" = "moving_avg_wiki_kafka_hive_2", -"kafka.bootstrap.servers"="cn105-10.l42scl.hortonworks.com:9092", --- STORE AS AVRO IN KAFKA -"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe"); +CREATE EXTERNAL TABLE + moving_avg_wiki_kafka_hive ( + `channel` STRING, + `namespace` STRING, + `page` STRING, + `timestamp` TIMESTAMP, + `avg_delta` DOUBLE) +STORED BY + 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' +TBLPROPERTIES( + "kafka.topic" = "moving_avg_wiki_kafka_hive_2", + "kafka.bootstrap.servers"="localhost:9092", + -- STORE AS AVRO IN KAFKA + "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe"); ``` -Then insert data into the table. Keep in mind that Kafka is an append only, thus you can not use insert overwrite. +Then, insert data into the table. Keep in mind that Kafka is append only thus you can not use insert overwrite. + ```sql -insert into table moving_avg_wiki_kafka_hive select `channel`, `namespace`, `page`, `timestamp`, -avg(delta) over (order by `timestamp` asc rows between 60 preceding and current row) as avg_delta, -null as `__key`, null as `__partition`, -1, -1 from l15min_wiki; +INSERT INTO TABLE + moving_avg_wiki_kafka_hive +SELECT + `channel`, + `namespace`, + `page`, + `timestamp`, + avg(`delta`) OVER (ORDER BY `timestamp` ASC ROWS BETWEEN 60 PRECEDING AND CURRENT ROW) AS `avg_delta`, + null AS `__key`, + null AS `__partition`, + -1, + -1 +FROM + l15min_wiki; ```