Today, I retransfer all the data online, I find the data loss again and
it's the same as last time. So I look into the file that is suspicious.
I find a weird character, I use a java program to parse it, and it's an
unicode two-char surrogate pair sequence, its code point is: 0x1F4AB.
Then, I look into the source code:
1. Class: org.apache.flume.serialization.LineDeserializer
The LineDeserializer use the
"org.apache.flume.serialization.ResettableFileInputStream#readChar" to
read one char, when it encounters the character "0x1F4AB", it returns
-1, and the remain file after the character are skipped.
2. Class: org.apache.flume.serialization.ResettableFileInputStream
the method
org.apache.flume.serialization.ResettableFileInputStream#readChar snippet:
CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
when the decoder decode the char "0x1F4AB" and the CoderResult is
OVERFLOW, that is right because 0x1F4AB should be represented as two char.
To solve this problem, I have a solution that is to implement a line
deserializer that use
"org.apache.flume.serialization.ResettableFileInputStream#read()"
instead of
"org.apache.flume.serialization.ResettableFileInputStream#readChar". But
I am not sure it's a good solution.
The attachment is a snippet of data with weird character at 2nd line.
Any suggestions?
Thanks,
Alex
On 1/22/2015 2:18 PM, Alex wrote:
1: In agent1, there is a "regex_extractor" interceptor for extracting
header "dt"
#interceptors
agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
agent1.sources.src_spooldir.interceptors.i1.serializers=s1
agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name
<http://agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name>=dt
in agent2, the hdfs sink use the header in the path, this is the
configurations:
agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt}
2: I misunderstood this property, thank you for revision.
Thanks,
Alex
On 1/22/2015 12:51 PM, Hari Shreedharan wrote:
1: How do you guarantee that the data from the previous day has not
spilled over to the next day? Where are you inserting the timestamp
(if you are doing bucketing).
2: Flume creates transactions for writes. Each batch defaults to 1000
events, which are written and flushed. There is still only one
transaction per sink, the pool size is for IO ops.
Thanks,
Hari
On Wed, Jan 21, 2015 at 7:32 PM, Jay Alexander <[email protected]
<mailto:[email protected]>> wrote:
First Question: No, I query the all the file in hdfs had been
closed, exactly I account the data one day later.
Second Question: I hadn't config any about the transaction. And I
saw there is an item in the hdfs sink
configuration:"hdfs.threadsPoolSize10Number of threads per HDFS
sink for HDFS IO ops (open, write, etc.)".
So there is 10 transactions per sink from the file channel.
Thanks.
2015-01-22 11:04 GMT+08:00 Hari Shreedharan
<[email protected] <mailto:[email protected]>>:
Are you accounting for the data still being written but not
yet hflushed at the time of the query? Basically one
transaction per sink ?
Thanks,
Hari
On Wed, Jan 21, 2015 at 6:42 PM, Jay Alexander
<[email protected] <mailto:[email protected]>> wrote:
I used *flume-ng 1.5* version to collect logs.
There are two agents in the data flow and they are on two
hosts, respectively.
And the data is sended *from agent1 to agent2.*
The agents's component is as follows:
agent1: spooling dir source --> file channel --> avro sink
agent2: avro source --> file channel --> hdfs sink
But it seems to loss data about 1/1000 percentage of
million data.To solve problem I tried these steps:
1. look up agents log: cannot find any error or exception.
2. look up agents monitor metrics: the events number
that put and take from channel always equals
3. statistic the data number by hive query and hdfs file
use shell, respectively: the two number is equal and
less than the online data number
These are the two agents configuration:
#agent1
agent1.sources = src_spooldir
agent1.channels = chan_file
agent1.sinks = sink_avro
#source
agent1.sources.src_spooldir.type = spooldir
agent1.sources.src_spooldir.spoolDir =
/data/logs/flume-spooldir
agent1.sources.src_spooldir.interceptors=i1
#interceptors
agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
agent1.sources.src_spooldir.interceptors.i1.serializers=s1
agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name
<http://agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name>=dt
#sink
agent1.sinks.sink_avro.type = avro
agent1.sinks.sink_avro.hostname = 10.235.2.212
agent1.sinks.sink_avro.port = 9910
#channel
agent1.channels.chan_file.type = file
agent1.channels.chan_file.checkpointDir =
/data/flume/agent1/checkpoint
agent1.channels.chan_file.dataDirs =
/data/flume/agent1/data
agent1.sources.src_spooldir.channels = chan_file
agent1.sinks.sink_avro.channel = chan_file
# agent2
agent2.sources = source1
agent2.channels = channel1
agent2.sinks = sink1
# source
agent2.sources.source1.type = avro
agent2.sources.source1.bind = 10.235.2.212
agent2.sources.source1.port = 9910
# sink
agent2.sinks.sink1.type= hdfs
agent2.sinks.sink1.hdfs.fileType = DataStream
agent2.sinks.sink1.hdfs.filePrefix = log
agent2.sinks.sink1.hdfs.path =
hdfs://hnd.hadoop.jsh:8020/data/%{dt}
agent2.sinks.sink1.hdfs.rollInterval = 600
agent2.sinks.sink1.hdfs.rollSize = 0
agent2.sinks.sink1.hdfs.rollCount = 0
agent2.sinks.sink1.hdfs.idleTimeout = 300
agent2.sinks.sink1.hdfs.round = true
agent2.sinks.sink1.hdfs.roundValue = 10
agent2.sinks.sink1.hdfs.roundUnit = minute
# channel
agent2.channels.channel1.type = file
agent2.channels.channel1.checkpointDir =
/data/flume/agent2/checkpoint
agent2.channels.channel1.dataDirs =
/data/flume/agent2/data
agent2.sinks.sink1.channel = channel1
agent2.sources.source1.channels = channel1
Any suggestions are welcome!
2015-01-15 16:23:33,740 sync_data 510003500 0
2015-01-15 16:23:33,936 player_action 510000291 /player/rename
{"player_id":510000291"":57,"request_body":"{\"new_name\":\"?????????????\"}"}
2015-01-15 16:23:33,988 sync_data 510002742 0