[
https://issues.apache.org/jira/browse/FLUME-2816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bartek Dobija updated FLUME-2816:
---------------------------------
Attachment: flume-1.6-elastic-2.0.patch
Hi,
I would like to donate a patch for this issue.
I was able to successfully build Flume release-1.6.0 and use with Elasticsearch
2.0 & 2.1 via rest and transport clients using the below configuration:
{code}
agent.sources = r1
agent.sinks = k1
agent.channels = c1
# Describe/configure the source
agent.sources.r1.type = netcat
agent.sources.r1.bind = localhost
agent.sources.r1.port = 44444
# Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
agent.channels = c1
agent.sinks = k1
agent.sinks.k1.type = elasticsearch
agent.sinks.k1.hostNames = 127.0.0.1:9300
#agent.sinks.k1.client = rest
#agent.sinks.k1.ttl = 2
agent.sinks.k1.serializer =
org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer
agent.sinks.k1.channel = c1
{code}
I am sorry to point out that the patch contains a couple ignored tests as the
Elasticsearch 2 client initialization is now more involved due to API changes.
Probably a decision would have to be made if the client should be mocked.
The fake client that previously had a null value assigned now causes NPE in one
of the ES Api object instances and seems to require significant re-work.
In order to be able to use the elasticsearch sink a user is still required to
copy elasticsearch*.jar and lucene-core-*.jar from the Elasticsearch lib
directory to the Flume lib directory. All other dependencies are packaged as
part of Flume.
There are new dependecies added to pom.xml and selected ones were updated.
In addition TTL configuraion option, due to its obsolecence in ES 2 has now
been removed.
> Unable to get apache-flume-1.6.0 to work with ElasticSearch 2.0 beta 2 -
> Works with Elasticsearch 1.7.3
> -------------------------------------------------------------------------------------------------------
>
> Key: FLUME-2816
> URL: https://issues.apache.org/jira/browse/FLUME-2816
> Project: Flume
> Issue Type: Improvement
> Components: Sinks+Sources
> Affects Versions: v1.6.0
> Environment: Centos 6
> Reporter: David Waugh
> Attachments: flume-1.6-elastic-2.0.patch
>
>
> Hello
> I am unable to get apache-flume-1.6.0 to work with ElasticSearch 2.0 beta 2.
> The same configuration works fine with Elasticsearch 1.7.3
> My flume-en.sh contains
> {noformat}
> Give Flume more memory and pre-allocate, enable remote monitoring via JMX
> export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
> Note that the Flume conf directory is always included in the classpath.
> FLUME_CLASSPATH="/usr/share/elasticsearch/lib/elasticsearch-2.0.0-beta2.jar"
> export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.85.x86_64/jre
> {noformat}
> I have copied the lucene-core-5.2.1.jar from /usr/share/elasticsearch/lib to
> /opt/flume/lib
> my elastic.conf file is:
> elastic.conf
> {noformat}
> #elastic.conf: A single-node Flume configuration
> #Reading avro files via spool dir and use the elastic sink
> #Name the components on this agent
> a1.sources = avrofld
> a1.sinks = k1
> a1.channels = c1
> #Describe/configure the folder with the avro files from Warehouse Connector
> a1.sources.avrofld.type = spooldir
> a1.sources.avrofld.channels = c1
> a1.sources.avrofld.spoolDir = /var/export
> a1.sources.avrofld.fileHeader = true
> a1.sources.avrofld.deserializer = avro
> Describe the elasticsearch sink
> a1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
> a1.sinks.k1.hostNames = 127.0.0.1:9300
> a1.sinks.k1.indexName = sa
> a1.sinks.k1.indexType = data
> a1.sinks.k1.clusterName = elasticsearch
> a1.sinks.k1.batchSize = 1000
> a1.sinks.k1.ttl = 2d
> a1.sinks.k1.serializer =
> com.rsa.flume.serialization.FlumeAvroEventDeserializer
> #Use a channel which buffers events in memory
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 1000000
> a1.channels.c1.transactionCapacity = 100000
> #Bind the source and sink to the channel
> a1.sources.r1.channels = c1
> a1.sinks.k1.channel = c1
> {noformat}
> Elasticsearch 2.0 is successfully started and listening on the port
> {noformat}
> [root@CentosB lib]# netstat -na |grep 900
> tcp 0 0 127.0.0.1:9200 0.0.0.0: LISTEN
> tcp 0 0 127.0.0.1:9300 0.0.0.0:* LISTEN
> {noformat}
> However when starting my flume I get the following error
> {noformat}
> [root@CentosB flume]# bin/flume-ng agent --conf conf --conf-file
> conf/elastic.conf --name a1 -Dflume.root.logger=DEBUG,console
> 2015-10-19 10:24:31,152 (lifecycleSupervisor-1-1) [ERROR -
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)]
> Unable to start SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@5c5dc0a5 counterGroup:{
> name:null counters:{} } } - Exception follows.
> java.lang.NoSuchMethodError:
> org.elasticsearch.common.transport.InetSocketTransportAddress.(Ljava/lang/String;I)V
> at
> org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
> at
> org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.(ElasticSearchTransportClient.java:77)
> at
> org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
> at
> org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
> at
> org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
> at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
> at
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> Full Log Below
> {noformat}
> Info: Sourcing environment configuration script /opt/flume/conf/flume-env.sh
> Info: Including Hive libraries found via () for Hive access
> exec /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.85.x86_64/jre/bin/java -Xms100m
> -Xmx2000m -Dcom.sun.management.jmxremote -Dflume.root.logger=DEBUG,console
> -cp
> '/opt/flume/conf:/opt/flume/lib/:/usr/share/elasticsearch/lib/elasticsearch-2.0.0-beta2.jar:/lib/'
> -Djava.library.path= org.apache.flume.node.Application --conf-file
> conf/elastic.conf --name a1 2015-10-19 10:24:31,017 (lifecycleSupervisor-1-0)
> [INFO -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)]
> Configuration provider starting 2015-10-19 10:24:31,020
> (lifecycleSupervisor-1-0) [DEBUG -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)]
> Configuration provider started 2015-10-19 10:24:31,023 (conf-file-poller-0)
> [DEBUG -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)]
> Checking file:conf/elastic.conf for changes 2015-10-19 10:24:31,023
> (conf-file-poller-0) [INFO -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)]
> Reloading configuration file:conf/elastic.conf 2015-10-19 10:24:31,028
> (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:k1 2015-10-19 10:24:31,028 (conf-file-poller-0) [DEBUG -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1021)]
> Created context for k1: serializer 2015-10-19 10:24:31,029
> (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:k1 2015-10-19 10:24:31,029 (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)]
> Added sinks: k1 Agent: a1 2015-10-19 10:24:31,029 (conf-file-poller-0) [INFO
> -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:k1 2015-10-19 10:24:31,029 (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:k1 2015-10-19 10:24:31,029 (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:k1 2015-10-19 10:24:31,029 (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:k1 2015-10-19 10:24:31,030 (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:k1 2015-10-19 10:24:31,030 (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:k1 2015-10-19 10:24:31,030 (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:k1 2015-10-19 10:24:31,030 (conf-file-poller-0) [DEBUG -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:314)]
> Starting validation of configuration for agent: a1, initial-configuration:
> AgentConfiguration[a1] SOURCES: {avrofld={ parameters:{fileHeader=true,
> channels=c1, spoolDir=/var/export, type=spooldir, deserializer=avro} }, r1={
> parameters:{channels=c1} }} CHANNELS: {c1={
> parameters:{transactionCapacity=100000, capacity=1000000, type=memory} }}
> SINKS: {k1={ parameters:{clusterName=elasticsearch, indexType=data,
> serializer=com.rsa.flume.serialization.FlumeAvroEventDeserializer,
> indexName=sa, batchSize=1000, hostNames=127.0.0.1:9300,
> type=org.apache.flume.sink.elasticsearch.ElasticSearchSink, ttl=2d,
> channel=c1} }} 2015-10-19 10:24:31,039 (conf-file-poller-0) [DEBUG -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:469)]
> Created channel c1 2015-10-19 10:24:31,046 (conf-file-poller-0) [DEBUG -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:675)]
> Creating sink: k1 using ELASTICSEARCH 2015-10-19 10:24:31,047
> (conf-file-poller-0) [DEBUG -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:372)]
> Post validation configuration for a1 AgentConfiguration created without
> Configuration stubs for which only basic syntactical validation was
> performed[a1] SOURCES: {avrofld={ parameters:{fileHeader=true, channels=c1,
> spoolDir=/var/export, type=spooldir, deserializer=avro} }} CHANNELS: {c1={
> parameters:{transactionCapacity=100000, capacity=1000000, type=memory} }}
> SINKS: {k1={ parameters:{clusterName=elasticsearch, indexType=data,
> serializer=com.rsa.flume.serialization.FlumeAvroEventDeserializer,
> indexName=sa, batchSize=1000, hostNames=127.0.0.1:9300,
> type=org.apache.flume.sink.elasticsearch.ElasticSearchSink, ttl=2d,
> channel=c1} }} 2015-10-19 10:24:31,047 (conf-file-poller-0) [DEBUG -
> org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)]
> Channels:c1 2015-10-19 10:24:31,048 (conf-file-poller-0) [DEBUG -
> org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)]
> Sinks k1 2015-10-19 10:24:31,048 (conf-file-poller-0) [DEBUG -
> org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:138)]
> Sources avrofld 2015-10-19 10:24:31,048 (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)]
> Post-validation flume configuration contains configuration for agents: [a1]
> 2015-10-19 10:24:31,048 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)]
> Creating channels 2015-10-19 10:24:31,056 (conf-file-poller-0) [INFO -
> org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)]
> Creating instance of channel c1 type memory 2015-10-19 10:24:31,063
> (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)]
> Created channel c1 2015-10-19 10:24:31,063 (conf-file-poller-0) [INFO -
> org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)]
> Creating instance of source avrofld, type spooldir 2015-10-19 10:24:31,086
> (conf-file-poller-0) [INFO -
> org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)]
> Creating instance of sink: k1, type:
> org.apache.flume.sink.elasticsearch.ElasticSearchSink 2015-10-19 10:24:31,086
> (conf-file-poller-0) [DEBUG -
> org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:63)]
> Sink type org.apache.flume.sink.elasticsearch.ElasticSearchSink is a custom
> type 2015-10-19 10:24:31,115 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)]
> Channel c1 connected to [avrofld, k1] 2015-10-19 10:24:31,124
> (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:138)]
> Starting new configuration:{ sourceRunners:{avrofld=EventDrivenSourceRunner:
> { source:Spool Directory source avrofld: { spoolDir: /var/export } }}
> sinkRunners:{k1=SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@5c5dc0a5 counterGroup:{
> name:null counters:{} } }}
> channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 2015-10-19
> 10:24:31,134 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:145)]
> Starting Channel c1 2015-10-19 10:24:31,136 (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]
> Monitored counter group for type: CHANNEL, name: c1: Successfully registered
> new MBean. 2015-10-19 10:24:31,136 (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]
> Component type: CHANNEL, name: c1 started 2015-10-19 10:24:31,137
> (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:173)]
> Starting Sink k1 2015-10-19 10:24:31,138 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:350)]
> ElasticSearch sink {} started 2015-10-19 10:24:31,139
> (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]
> Monitored counter group for type: SINK, name: k1: Successfully registered
> new MBean. 2015-10-19 10:24:31,139 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]
> Component type: SINK, name: k1 started 2015-10-19 10:24:31,142
> (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:184)]
> Starting Source avrofld 2015-10-19 10:24:31,142 (lifecycleSupervisor-1-2)
> [INFO -
> org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:78)]
> SpoolDirectorySource source starting with directory: /var/export 2015-10-19
> 10:24:31,147 (lifecycleSupervisor-1-1) [WARN -
> org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:136)]
> [127.0.0.1:9300] 2015-10-19 10:24:31,152 (lifecycleSupervisor-1-1) [ERROR -
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)]
> Unable to start SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@5c5dc0a5 counterGroup:{
> name:null counters:{} } } - Exception follows. java.lang.NoSuchMethodError:
> org.elasticsearch.common.transport.InetSocketTransportAddress.(Ljava/lang/String;I)V
> at
> org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
> at
> org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.(ElasticSearchTransportClient.java:77)
> at
> org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
> at
> org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
> at
> org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
> at org.apache.flume.SinkRunner.start(SinkRunner.java:79) at
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745) 2015-10-19 10:24:31,160
> (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.sink.elasticsearch.ElasticSearchSink.stop(ElasticSearchSink.java:376)]
> ElasticSearch sink {} stopping 2015-10-19 10:24:31,160
> (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:150)]
> Component type: SINK, name: k1 stopped 2015-10-19 10:24:31,160
> (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:156)]
> Shutdown Metric for type: SINK, name: k1. sink.start.time == 1445246671139
> 2015-10-19 10:24:31,160 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:162)]
> Shutdown Metric for type: SINK, name: k1. sink.stop.time == 1445246671160
> 2015-10-19 10:24:31,160 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)]
> Shutdown Metric for type: SINK, name: k1. sink.batch.complete == 0
> 2015-10-19 10:24:31,161 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)]
> Shutdown Metric for type: SINK, name: k1. sink.batch.empty == 0 2015-10-19
> 10:24:31,161 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)]
> Shutdown Metric for type: SINK, name: k1. sink.batch.underflow == 0
> 2015-10-19 10:24:31,161 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)]
> Shutdown Metric for type: SINK, name: k1. sink.connection.closed.count == 1
> 2015-10-19 10:24:31,161 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)]
> Shutdown Metric for type: SINK, name: k1. sink.connection.creation.count ==
> 0 2015-10-19 10:24:31,161 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)]
> Shutdown Metric for type: SINK, name: k1. sink.connection.failed.count == 0
> 2015-10-19 10:24:31,161 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)]
> Shutdown Metric for type: SINK, name: k1. sink.event.drain.attempt == 0
> 2015-10-19 10:24:31,161 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)]
> Shutdown Metric for type: SINK, name: k1. sink.event.drain.sucess == 0
> 2015-10-19 10:24:31,161 (lifecycleSupervisor-1-1) [WARN -
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)]
> Component SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@5c5dc0a5 counterGroup:{
> name:null counters:{} } } stopped, since it could not besuccessfully started
> due to missing dependencies 2015-10-19 10:24:31,155 (lifecycleSupervisor-1-2)
> [DEBUG -
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.(ReliableSpoolingFileEventReader.java:138)]
> Initializing ReliableSpoolingFileEventReader with directory=/var/export,
> metaDir=.flumespool, deserializer=avro 2015-10-19 10:24:31,175
> (lifecycleSupervisor-1-2) [DEBUG -
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.(ReliableSpoolingFileEventReader.java:160)]
> Successfully created and deleted canary file:
> /var/export/flume-spooldir-perm-check-8040653594475164987.canary 2015-10-19
> 10:24:31,176 (lifecycleSupervisor-1-2) [DEBUG -
> org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:111)]
> SpoolDirectorySource source started 2015-10-19 10:24:31,177
> (lifecycleSupervisor-1-2) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]
> Monitored counter group for type: SOURCE, name: avrofld: Successfully
> registered new MBean. 2015-10-19 10:24:31,177 (lifecycleSupervisor-1-2) [INFO
> -
> org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]
> Component type: SOURCE, name: avrofld started
> {noformat}
> I opened a gihub issue
> https://github.com/elastic/elasticsearch/issues/14187
> Looks like it is due to this change
> https://www.elastic.co/guide/en/elasticsearch/reference/2.0/_java_api_changes.html#_inetsockettransportaddress_removed
> And this looks to be because the ElasticSerach API has changed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)