Hi,

I have streaming Kafka that sends data to flume in the following JSON format

This is the record is sent via Kafka

7d645a0f-0386-4405-8af1-7fca908fe928
{"rowkey":"7d645a0f-0386-4405-8af1-7fca908fe928","ticker":"IBM",
"timeissued":"2020-02-14T20:32:29", "price":140.11}

Note that "7d645a0f-0386-4405-8af1-7fca908fe928" is the key and there are 4
columns in value including the key itself as another column.

The Flume configuration file is as follows

# Describing/Configuring the sink
JsonAgent.channels.hdfs-channel-1.type = memory
JsonAgent.channels.hdfs-channel-1.capacity = 300
JsonAgent.channels.hdfs-channel-1.transactionCapacity = 100
*JsonAgent.sinks.Hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink*
JsonAgent.sinks.Hbase-sink.channel =hdfs-channel-1
JsonAgent.sinks.Hbase-sink.table =trading:MARKETDATAHBASEBATCH
JsonAgent.sinks.Hbase-sink.columnFamily=PRICE_INFO
JsonAgent.sinks.Hbase-sink.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer
*JsonAgent.sinks.Hbase-sink.serializer.regex =(.+),(.+),(.+),(.+)*

*JsonAgent.sinks.Hbase-sink.serializer.rowKeyIndex  =
0JsonAgent.sinks.Hbase-sink.serializer.colNames
=ROW_KEY,ticker,timeissued,price*
JsonAgent.sinks.Hbase-sink.serializer.regexIgnoreCase = true
JsonAgent.sinks.Hbase-sink.batchSize =100

This works and posts records to Hbase as follows:

ROW                                                            COLUMN+CELL
 {"rowkey":"7d645a0f-0386-4405-8af1-7fca908fe928"
column=PRICE_INFO:price, timestamp=1581711715292, value= "price":140.11}
 {"rowkey":"7d645a0f-0386-4405-8af1-7fca908fe928"
column=PRICE_INFO:ticker, timestamp=1581711715292, value="ticker":"IBM"
 {"rowkey":"7d645a0f-0386-4405-8af1-7fca908fe928"
column=PRICE_INFO:timeissued, timestamp=1581711715292, value=
"timeissued":"2020-02-14T20:32:29"
1 row(s) in 0.0050 seconds

However there is a problem. the rowkey value includes redundant
characters {"rowkey": that do not allow for records to be searched in Hbase
based on rowkey value! When I try to ignore the redundant characters by
twicking regex, unfortunately no rows are added to Hbase table. Example as
follows:

JsonAgent.sinks.Hbase-sink.serializer.regex = (?<=^.{9}).+,(.+),(.+),(.+)

Appreciate any advice.

Thanks,

Mich

Reply via email to