First Question: No, I query the all the file in hdfs had been closed, exactly I account the data one day later.
Second Question: I hadn't config any about the transaction. And I saw there is an item in the hdfs sink configuration:"hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)". So there is 10 transactions per sink from the file channel. Thanks. 2015-01-22 11:04 GMT+08:00 Hari Shreedharan <[email protected]>: > Are you accounting for the data still being written but not yet hflushed > at the time of the query? Basically one transaction per sink ? > > Thanks, > Hari > > > On Wed, Jan 21, 2015 at 6:42 PM, Jay Alexander <[email protected]> wrote: > >> I used *flume-ng 1.5* version to collect logs. >> >> There are two agents in the data flow and they are on two hosts, >> respectively. >> >> And the data is sended *from agent1 to agent2.* >> >> The agents's component is as follows: >> >> agent1: spooling dir source --> file channel --> avro sink >> agent2: avro source --> file channel --> hdfs sink >> >> But it seems to loss data about 1/1000 percentage of million data.To >> solve problem I tried these steps: >> >> 1. look up agents log: cannot find any error or exception. >> 2. look up agents monitor metrics: the events number that put and >> take from channel always equals >> 3. statistic the data number by hive query and hdfs file use shell, >> respectively: the two number is equal and less than the online data number >> >> >> These are the two agents configuration: >> >> #agent1 >> agent1.sources = src_spooldir >> agent1.channels = chan_file >> agent1.sinks = sink_avro >> >> #source >> agent1.sources.src_spooldir.type = spooldir >> agent1.sources.src_spooldir.spoolDir = /data/logs/flume-spooldir >> agent1.sources.src_spooldir.interceptors=i1 >> >> #interceptors >> agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor >> agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).* >> agent1.sources.src_spooldir.interceptors.i1.serializers=s1 >> agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name=dt >> >> #sink >> agent1.sinks.sink_avro.type = avro >> agent1.sinks.sink_avro.hostname = 10.235.2.212 >> agent1.sinks.sink_avro.port = 9910 >> >> #channel >> agent1.channels.chan_file.type = file >> agent1.channels.chan_file.checkpointDir = /data/flume/agent1/checkpoint >> agent1.channels.chan_file.dataDirs = /data/flume/agent1/data >> >> agent1.sources.src_spooldir.channels = chan_file >> agent1.sinks.sink_avro.channel = chan_file >> >> >> >> # agent2 >> agent2.sources = source1 >> agent2.channels = channel1 >> agent2.sinks = sink1 >> >> # source >> agent2.sources.source1.type = avro >> agent2.sources.source1.bind = 10.235.2.212 >> agent2.sources.source1.port = 9910 >> >> # sink >> agent2.sinks.sink1.type= hdfs >> agent2.sinks.sink1.hdfs.fileType = DataStream >> agent2.sinks.sink1.hdfs.filePrefix = log >> agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt} >> agent2.sinks.sink1.hdfs.rollInterval = 600 >> agent2.sinks.sink1.hdfs.rollSize = 0 >> agent2.sinks.sink1.hdfs.rollCount = 0 >> agent2.sinks.sink1.hdfs.idleTimeout = 300 >> agent2.sinks.sink1.hdfs.round = true >> agent2.sinks.sink1.hdfs.roundValue = 10 >> agent2.sinks.sink1.hdfs.roundUnit = minute >> >> # channel >> agent2.channels.channel1.type = file >> agent2.channels.channel1.checkpointDir = /data/flume/agent2/checkpoint >> agent2.channels.channel1.dataDirs = /data/flume/agent2/data >> agent2.sinks.sink1.channel = channel1 >> agent2.sources.source1.channels = channel1 >> >> >> Any suggestions are welcome! >> > >
