qihuagao created FLUME-3108: ------------------------------- Summary: I can not roll my log for hdfs sink based on timestamp of log content. Key: FLUME-3108 URL: https://issues.apache.org/jira/browse/FLUME-3108 Project: Flume Issue Type: Bug Components: Sinks+Sources Affects Versions: 1.7.0 Reporter: qihuagao
I use regex_extractor to extract timestamp for my log files with a1.sinks.k1.serializer = header_and_text, I checked the new timestamps could have been save in hdfs files. but hdfs rolling, can not work as I expect, I expect it could roll logs by timestamp in logs instead of current timestamps. So is it workable, or did I do something wrong, thank guys for help. the following is my configruation: a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.s1.channels = c1 a1.sources.s1.batchSize = 50 a1.sources.s1.batchDurationMillis = 2000 a1.sources.s1.kafka.bootstrap.servers =* a1.sources.s1.kafka.topics = LOG a1.sources.s1.useFlumeEventFormat=true a1.sources.s1.kafka.consumer.group.id = custom.g.id a1.sources.s1.interceptors = i1 a1.sources.s1.interceptors.i1.type = regex_extractor a1.sources.s1.interceptors.i1.regex = [(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d:\\d\\d)] a1.sources.s1.interceptors.i1.serializers = s1 a1.sources.s1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.s1.interceptors.i1.serializers.s1.name = timestamp a1.sources.s1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 128000000 #a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.1.247:9000/logs/%Y-%m-%d/%H a1.sinks.k1.hdfs.filePrefix = logs a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.roundUnit = hour a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval=0 a1.sinks.k1.hdfs.batchSize = 120 a1.sinks.k1.hdfs.idleTimeout=120 a1.sinks.k1.serializer = header_and_text -- This message was sent by Atlassian JIRA (v6.4.14#64029)