I was able to figure this out. Looks like avro source also needs port and bind.
On Mon, Apr 2, 2012 at 11:53 AM, Mohit Anchlia <mohitanch...@gmail.com>wrote: > I am getting some exception. It looks like FlumeConfiguration.java is able > to validate config but AbstractFileConfigurationProvider fails to load. > > ./bin/flume-ng node --conf ./conf --conf-file ./conf/flume-conf.properties > --name foo > > 2012-04-02 11:44:22,446 (conf-file-poller-0) [DEBUG - > org.apache.flume.conf.properties.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:225)] > Starting validation of configuration for agent: foo, initial-configuration: > AgentConfiguration[foo] > SOURCES: {avroSrc=ComponentConfiguration[avroSrc] > CONFIG: {channels=memoryChannel, type=avro} > RUNNER: ComponentConfiguration[runner] > CONFIG: {} > > > } > CHANNELS: {memoryChannel=ComponentConfiguration[memoryChannel] > CONFIG: {capacity=100, type=memory} > > } > SINKS: {hdfsSink=ComponentConfiguration[hdfsSink] > CONFIG: {path=hdfs://dsdb1:9000/flume, type=hdfs, channel=memoryChannel} > RUNNER: ComponentConfiguration[runner] > CONFIG: {} > > > } > 2012-04-02 11:44:22,446 (conf-file-poller-0) [INFO - > org.apache.flume.conf.properties.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:119)] > Post-validation flume configuration contains configuation for agents: [foo] > 2012-04-02 11:44:22,447 (conf-file-poller-0) [DEBUG - > org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:67)] > Creating instance of channel memoryChannel type memory > 2012-04-02 11:44:22,469 (conf-file-poller-0) [DEBUG - > org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:73)] > Creating instance of source avroSrc, type avro > 2012-04-02 11:44:22,480 (conf-file-poller-0) [ERROR - > org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:205)] > Failed to load configuration data. Exception follows. > java.lang.NumberFormatException: null > at java.lang.Integer.parseInt(Integer.java:417) > at java.lang.Integer.parseInt(Integer.java:499) > at > org.apache.flume.source.AvroSource.configure(AvroSource.java:114) > at > org.apache.flume.conf.Configurables.configure(Configurables.java:41) > at > org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadSources(PropertiesFileConfigurationProvider.java:274) > at > org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.load(PropertiesFileConfigurationProvider.java:214) > at > org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(AbstractFileConfigurationProvider.java:124) > at > org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(AbstractFileConfigurationProvider.java:38) > at > org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:203) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > > > --- > > Configuration: > > foo.sources = avroSrc > foo.channels = memoryChannel > foo.sinks = hdfsSink > > # For each one of the sources, the type is defined > foo.sources.avroSrc.type = avro > > # The channel can be defined as follows. > foo.sources.avroSrc.channels = memoryChannel > > # Each sink's type must be defined > foo.sinks.hdfsSink.type = hdfs > foo.sinks.hdfsSink.path = hdfs://dsdb1:9000/flume > > #Specify the channel the sink should use > foo.sinks.hdfsSink.channel = memoryChannel > > # Each channel's type is defined. > foo.channels.memoryChannel.type = memory > > # Other config values specific to each type of channel(sink or source) > # can be defined as well > # In this case, it specifies the capacity of the memory channel > foo.channels.memoryChannel.capacity = 100 >