I have found an insue for this already in jira: https://issues.apache.org/jira/browse/FLUME-2215
2015-01-24 10:04 GMT+08:00 Hari Shreedharan <[email protected]>: > Can you a file a jira with this info? We can probably make the change. > > Thanks, > Hari > > > On Fri, Jan 23, 2015 at 2:07 AM, Alex <[email protected]> wrote: > >> Today, I retransfer all the data online, I find the data loss again and >> it's the same as last time. So I look into the file that is suspicious. >> I find a weird character, I use a java program to parse it, and it's an >> unicode two-char surrogate pair sequence, its code point is: 0x1F4AB. >> >> Then, I look into the source code: >> 1. Class: org.apache.flume.serialization.LineDeserializer >> The LineDeserializer use the >> "org.apache.flume.serialization.ResettableFileInputStream#readChar" to read >> one char, when it encounters the character "0x1F4AB", it returns -1, and >> the remain file after the character are skipped. >> 2. Class: org.apache.flume.serialization.ResettableFileInputStream >> the method >> org.apache.flume.serialization.ResettableFileInputStream#readChar snippet: >> CoderResult res = decoder.decode(buf, charBuf, isEndOfInput); >> when the decoder decode the char "0x1F4AB" and the CoderResult is >> OVERFLOW, that is right because 0x1F4AB should be represented as two char. >> >> To solve this problem, I have a solution that is to implement a line >> deserializer that use >> "org.apache.flume.serialization.ResettableFileInputStream#read()" instead >> of "org.apache.flume.serialization.ResettableFileInputStream#readChar". But >> I am not sure it's a good solution. >> >> The attachment is a snippet of data with weird character at 2nd line. >> >> Any suggestions? >> >> Thanks, >> Alex >> >> On 1/22/2015 2:18 PM, Alex wrote: >> >> 1: In agent1, there is a "regex_extractor" interceptor for extracting >> header "dt" >> >> #interceptors >> agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor >> >> agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).* >> agent1.sources.src_spooldir.interceptors.i1.serializers=s1 >> agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name=dt >> >> in agent2, the hdfs sink use the header in the path, this is the >> configurations: >> >> agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt} >> >> 2: I misunderstood this property, thank you for revision. >> >> Thanks, >> Alex >> >> >> On 1/22/2015 12:51 PM, Hari Shreedharan wrote: >> >> 1: How do you guarantee that the data from the previous day has not >> spilled over to the next day? Where are you inserting the timestamp (if you >> are doing bucketing). >> 2: Flume creates transactions for writes. Each batch defaults to 1000 >> events, which are written and flushed. There is still only one >> transaction per sink, the pool size is for IO ops. >> >> Thanks, >> Hari >> >> >> On Wed, Jan 21, 2015 at 7:32 PM, Jay Alexander <[email protected]> wrote: >> >>> First Question: No, I query the all the file in hdfs had been closed, >>> exactly I account the data one day later. >>> >>> Second Question: I hadn't config any about the transaction. And I saw >>> there is an item in the hdfs sink configuration:"hdfs.threadsPoolSize 10 >>> Number >>> of threads per HDFS sink for HDFS IO ops (open, write, etc.)". >>> So there is 10 transactions per sink from the file channel. >>> >>> Thanks. >>> >>> >>> 2015-01-22 11:04 GMT+08:00 Hari Shreedharan <[email protected]>: >>> >>>> Are you accounting for the data still being written but not yet >>>> hflushed at the time of the query? Basically one transaction per sink ? >>>> >>>> Thanks, >>>> Hari >>>> >>>> >>>> On Wed, Jan 21, 2015 at 6:42 PM, Jay Alexander <[email protected]> >>>> wrote: >>>> >>>>> I used *flume-ng 1.5* version to collect logs. >>>>> >>>>> There are two agents in the data flow and they are on two hosts, >>>>> respectively. >>>>> >>>>> And the data is sended *from agent1 to agent2.* >>>>> >>>>> The agents's component is as follows: >>>>> >>>>> agent1: spooling dir source --> file channel --> avro sink >>>>> agent2: avro source --> file channel --> hdfs sink >>>>> >>>>> But it seems to loss data about 1/1000 percentage of million data.To >>>>> solve problem I tried these steps: >>>>> >>>>> 1. look up agents log: cannot find any error or exception. >>>>> 2. look up agents monitor metrics: the events number that put and >>>>> take from channel always equals >>>>> 3. statistic the data number by hive query and hdfs file use >>>>> shell, respectively: the two number is equal and less than the online >>>>> data >>>>> number >>>>> >>>>> >>>>> These are the two agents configuration: >>>>> >>>>> #agent1 >>>>> agent1.sources = src_spooldir >>>>> agent1.channels = chan_file >>>>> agent1.sinks = sink_avro >>>>> >>>>> #source >>>>> agent1.sources.src_spooldir.type = spooldir >>>>> agent1.sources.src_spooldir.spoolDir = /data/logs/flume-spooldir >>>>> agent1.sources.src_spooldir.interceptors=i1 >>>>> >>>>> #interceptors >>>>> agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor >>>>> >>>>> agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).* >>>>> agent1.sources.src_spooldir.interceptors.i1.serializers=s1 >>>>> agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name=dt >>>>> >>>>> #sink >>>>> agent1.sinks.sink_avro.type = avro >>>>> agent1.sinks.sink_avro.hostname = 10.235.2.212 >>>>> agent1.sinks.sink_avro.port = 9910 >>>>> >>>>> #channel >>>>> agent1.channels.chan_file.type = file >>>>> agent1.channels.chan_file.checkpointDir = >>>>> /data/flume/agent1/checkpoint >>>>> agent1.channels.chan_file.dataDirs = /data/flume/agent1/data >>>>> >>>>> agent1.sources.src_spooldir.channels = chan_file >>>>> agent1.sinks.sink_avro.channel = chan_file >>>>> >>>>> >>>>> >>>>> # agent2 >>>>> agent2.sources = source1 >>>>> agent2.channels = channel1 >>>>> agent2.sinks = sink1 >>>>> >>>>> # source >>>>> agent2.sources.source1.type = avro >>>>> agent2.sources.source1.bind = 10.235.2.212 >>>>> agent2.sources.source1.port = 9910 >>>>> >>>>> # sink >>>>> agent2.sinks.sink1.type= hdfs >>>>> agent2.sinks.sink1.hdfs.fileType = DataStream >>>>> agent2.sinks.sink1.hdfs.filePrefix = log >>>>> agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt} >>>>> agent2.sinks.sink1.hdfs.rollInterval = 600 >>>>> agent2.sinks.sink1.hdfs.rollSize = 0 >>>>> agent2.sinks.sink1.hdfs.rollCount = 0 >>>>> agent2.sinks.sink1.hdfs.idleTimeout = 300 >>>>> agent2.sinks.sink1.hdfs.round = true >>>>> agent2.sinks.sink1.hdfs.roundValue = 10 >>>>> agent2.sinks.sink1.hdfs.roundUnit = minute >>>>> >>>>> # channel >>>>> agent2.channels.channel1.type = file >>>>> agent2.channels.channel1.checkpointDir = >>>>> /data/flume/agent2/checkpoint >>>>> agent2.channels.channel1.dataDirs = /data/flume/agent2/data >>>>> agent2.sinks.sink1.channel = channel1 >>>>> agent2.sources.source1.channels = channel1 >>>>> >>>>> >>>>> Any suggestions are welcome! >>>>> >>>> >>>> >>> >> >> >> <t4.log> > > >
