sanathkumar prasanna created FLUME-3071:
-------------------------------------------
Summary: Multiplex Channel Selector is not working properly at
flume 1.7 version
Key: FLUME-3071
URL: https://issues.apache.org/jira/browse/FLUME-3071
Project: Flume
Issue Type: Request
Components: Configuration
Affects Versions: v1.7.0
Environment: Environment Details
Hadoop 2.5.2
Flume 1.7
Hive 0.14
Reporter: sanathkumar prasanna
Fix For: v1.7.0
Environment Details
I am Using multiplex channel selector to load the data to two different Hive
sinks.But it get strucked ai source si started after that didn't get any
error.Data also not get loaded here.
#Source
test_interceptors.sources = RTI
test_interceptors.channels = RTI_Channel1 RTI_Channel2
test_interceptors.sinks = RTI_to_hive1 RTI_to_hive1
test_interceptors.sources.RTI.channels = RTI_Channel1 RTI_Channel2
#test_interceptors.sinks.RTI_to_hive3.channel = RTI_Channel3
test_interceptors.sources.RTI.type = TAILDIR
test_interceptors.sources.RTI.positionFile=/home/retailteg/flume/taildir_position.json
test_interceptors.sources.RTI.filegroups=f1
test_interceptors.sources.RTI.filegroups.f1=/home/retailteg/flume/flumeSpool/initial_insert
test_interceptors.sources.RTI.filegroups.f1.headerkey1=value1
test_interceptors.sources.RTI.fileHeader = false
test_interceptors.sources.RTI.skipToEnd = false
test_interceptors.sources.RTI.selector.type = multiplexing
test_interceptors.sources.RTI.selector.header=table_name
test_interceptors.sources.RTI.selector.mapping.Table1=RTI_Channel1
test_interceptors.sources.RTI.selector.mapping.Table2= RTI_Channel2
test_interceptors.sources.RTI.selector.default= RTI_Channel2
#Channel
test_interceptors.channels.RTI_Channel1.type = memory
test_interceptors.channels.RTI_Channel1.capacity = 100000
test_interceptors.channels.RTI_Channel1.transactionCapacity = 100000
test_interceptors.channels.RTI_Channel2.type = memory
test_interceptors.channels.RTI_Channel2.capacity = 100000
test_interceptors.channels.RTI_Channel2.transactionCapacity = 100000
#Sinks
test_interceptors.sinks.RTI_to_hive1.type = hive
test_interceptors.sinks.RTI_to_hive1.hive.metastore =
thrift://172.20.180.64:9083
test_interceptors.sinks.RTI_to_hive1.hive.database = default
test_interceptors.sinks.RTI_to_hive1.hive.table = RTI_Test_Feb2017_8
test_interceptors.sinks.RTI_to_hive1.hive.partition = %Y-%m-%d
test_interceptors.sinks.RTI_to_hive1.useLocalTimeStamp = true
test_interceptors.sinks.RTI_to_hive1.round = true
test_interceptors.sinks.RTI_to_hive1.roundValue = 10
test_interceptors.sinks.RTI_to_hive1.roundUnit = minute
test_interceptors.sinks.RTI_to_hive1.serializer = DELIMITED
test_interceptors.sinks.RTI_to_hive1.serializer.delimiter = ","
test_interceptors.sinks.RTI_to_hive1.serializer.serdeSeparator = ','
test_interceptors.sinks.RTI_to_hive1.serializer.fieldnames =
table_name,rxclaim,seqno,carrier_id,account_id,group_id,member_id,drugcost,pharmacyname,membername,datofbirth,plan_id,landing_timestamp,journal_timestamp,transaction_flag
test_interceptors.sinks.RTI_to_hive2.type = hive
test_interceptors.sinks.RTI_to_hive2.hive.metastore =
thrift://172.20.180.64:9083
test_interceptors.sinks.RTI_to_hive2.hive.database = default
test_interceptors.sinks.RTI_to_hive2.hive.table = RTI_Test_Feb2017_9
test_interceptors.sinks.RTI_to_hive2.hive.partition = %Y-%m-%d
test_interceptors.sinks.RTI_to_hive2.useLocalTimeStamp = true
test_interceptors.sinks.RTI_to_hive2.round = true
test_interceptors.sinks.RTI_to_hive2.roundValue = 10
test_interceptors.sinks.RTI_to_hive2.roundUnit = minute
test_interceptors.sinks.RTI_to_hive2.serializer = DELIMITED
test_interceptors.sinks.RTI_to_hive2.serializer.delimiter = ","
test_interceptors.sinks.RTI_to_hive2.serializer.serdeSeparator = ','
test_interceptors.sinks.RTI_to_hive2.serializer.fieldnames =
table_name,rxclaim,seqno,carrier_id,account_id,group_id,member_id,drugcost,pharmacyname,membername,datofbirth,plan_id,landing_timestamp,journal_timestamp,transaction_flag
test_interceptors.sinks.RTI_to_hive1.channel = RTI_Channel1
test_interceptors.sinks.RTI_to_hive2.channel = RTI_Channel2
In Log Files Contenet
2017-03-10 19:44:18,770 (main) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:144)]
Starting Channel RTI_Channel1
2017-03-10 19:44:18,773 (main) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:144)]
Starting Channel RTI_Channel2
2017-03-10 19:44:18,775 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]
Monitored counter group for type: CHANNEL, name: RTI_Channel1: Successfully
registered new MBean.
2017-03-10 19:44:18,775 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
Component type: CHANNEL, name: RTI_Channel1 started
2017-03-10 19:44:18,776 (main) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:159)]
Waiting for channel: RTI_Channel2 to start. Sleeping for 500 ms
2017-03-10 19:44:18,795 (lifecycleSupervisor-1-1) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]
Monitored counter group for type: CHANNEL, name: RTI_Channel2: Successfully
registered new MBean.
2017-03-10 19:44:18,795 (lifecycleSupervisor-1-1) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
Component type: CHANNEL, name: RTI_Channel2 started
2017-03-10 19:44:19,276 (main) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:171)]
Starting Sink RTI_to_hive1
2017-03-10 19:44:19,277 (main) [INFO -
org.apache.flume.node.Application.startAllComponents(Application.java:182)]
Starting Source RTI
2017-03-10 19:44:19,277 (lifecycleSupervisor-1-5) [INFO -
org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:92)] RTI
TaildirSource source starting with directory:
{f1=/home/retailteg/flume/flumeSpool/initial_insert}
2017-03-10 19:44:19,281 (lifecycleSupervisor-1-5) [DEBUG -
org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:75)]
Initializing ReliableTaildirEventReader with
directory={f1=/home/retailteg/flume/flumeSpool/initial_insert}, metaDir={}
2017-03-10 19:44:19,283 (lifecycleSupervisor-1-5) [INFO -
org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:83)]
taildirCache: [{filegroup='f1',
filePattern='/home/retailteg/flume/flumeSpool/initial_insert', cached=true}]
2017-03-10 19:44:19,285 (lifecycleSupervisor-1-5) [INFO -
org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:84)]
headerTable: {}
2017-03-10 19:44:19,289 (lifecycleSupervisor-1-5) [INFO -
org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile(ReliableTaildirEventReader.java:283)]
Opening file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903,
pos: 0
2017-03-10 19:44:19,290 (lifecycleSupervisor-1-5) [INFO -
org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:94)]
Updating position from position file:
/home/retailteg/flume/taildir_position.json
2017-03-10 19:44:19,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:141)] Polling
sink runner starting
2017-03-10 19:44:19,362 (lifecycleSupervisor-1-5) [INFO -
org.apache.flume.source.taildir.TailFile.updatePos(TailFile.java:126)] Updated
position, file: /home/retailteg/flume/flumeSpool/initial_insert, inode:
9970903, pos: 884
2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [DEBUG -
org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:118)]
TaildirSource started
2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]
Monitored counter group for type: SOURCE, name: RTI: Successfully registered
new MBean.
2017-03-10 19:44:19,364 (lifecycleSupervisor-1-5) [INFO -
org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
Component type: SOURCE, name: RTI started
2017-03-10 19:44:19,366 (PollableSourceRunner-TaildirSource-RTI) [DEBUG -
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:127)]
Polling runner starting. Source:Taildir source: { positionFile:
/home/retailteg/flume/taildir_position.json, skipToEnd: false,
byteOffsetHeader: false, idleTimeout: 120000, writePosInterval: 3000 }
2017-03-10 19:46:19,459 (PollableSourceRunner-TaildirSource-RTI) [INFO -
org.apache.flume.source.taildir.TaildirSource.closeTailFiles(TaildirSource.java:288)]
Closed file: /home/retailteg/flume/flumeSpool/initial_insert, inode: 9970903,
pos: 884
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)