[ 
https://issues.apache.org/jira/browse/FLUME-3012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15594537#comment-15594537
 ] 

Ping Wang commented on FLUME-3012:
----------------------------------

I found flume-ng-node/src/main/java/org/apache/flume/node/Application.java 
already has Shutdown Hook to handle SIGTERM:

final Application appReference = application;
      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
        @Override
        public void run() {
          appReference.stop();
        }
      });

In the scenario,  addShutdownHook was called but stop operation was not done. 


> Sending a term signal can not shutdown Flume agent when KafkaChannel starting 
> has exceptions
> --------------------------------------------------------------------------------------------
>
>                 Key: FLUME-3012
>                 URL: https://issues.apache.org/jira/browse/FLUME-3012
>             Project: Flume
>          Issue Type: Bug
>          Components: Kafka Channel
>    Affects Versions: v1.6.0
>         Environment: Flume 1.6.0+Kafka 0.9 
>            Reporter: Ping Wang
>             Fix For: v1.8.0
>
>         Attachments: threaddumps.log
>
>
> Use Kafka Channel in the agent configuration:
> tier1.sources = source1
> tier1.channels = channel1
> tier1.sinks = sink1
> tier1.sources.source1.type = exec
> tier1.sources.source1.command = /usr/bin/vmstat 1
> tier1.sources.source1.channels = channel1
> tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
> tier1.channels.channel1.kafka.bootstrap.servers = a.b.c.d:6667
> tier1.sinks.sink1.type = hdfs
> tier1.sinks.sink1.hdfs.path = /tmp/kafka/channel
> tier1.sinks.sink1.hdfs.rollInterval = 5
> tier1.sinks.sink1.hdfs.rollSize = 0
> tier1.sinks.sink1.hdfs.rollCount = 0
> tier1.sinks.sink1.hdfs.fileType = DataStream
> tier1.sinks.sink1.channel = channel1
> Accidentally kaka.bootstrap.servers is not correct,  errors will be thrown 
> out:
> ......
> )] Waiting for channel: channel1 to start. Sleeping for 500 ms
> 2016-10-21 01:15:50,739 (conf-file-poller-0) [INFO - 
> org.apache.flume.node.Application.startAllComponents(Application.java:161)] 
> Waiting for channel: channel1 to start. Sleeping for 500 ms
> 2016-10-21 01:15:51,240 (conf-file-poller-0) [INFO - 
> org.apache.flume.node.Application.startAllComponents(Application.java:161)] 
> Waiting for channel: channel1 to start. Sleeping for 500 ms
> 2016-10-21 01:15:51,735 (lifecycleSupervisor-1-6) [INFO - 
> org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:115)] 
> Starting Kafka Channel: channel1
> 2016-10-21 01:15:51,737 (lifecycleSupervisor-1-6) [INFO - 
> org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:165)]
>  ProducerConfig values: 
>       compression.type = none
>       metric.reporters = []
>       metadata.max.age.ms = 300000
>       metadata.fetch.timeout.ms = 60000
>       reconnect.backoff.ms = 50
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       bootstrap.servers = [a.b.c.d:6667]
>       retry.backoff.ms = 100
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       buffer.memory = 33554432
>       timeout.ms = 30000
>       key.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       ssl.keystore.type = JKS
>       ssl.trustmanager.algorithm = PKIX
>       block.on.buffer.full = false
>       ssl.key.password = null
>       max.block.ms = 60000
>       sasl.kerberos.min.time.before.relogin = 60000
>       connections.max.idle.ms = 540000
>       ssl.truststore.password = null
>       max.in.flight.requests.per.connection = 5
>       metrics.num.samples = 2
>       client.id = 
>       ssl.endpoint.identification.algorithm = null
>       ssl.protocol = TLS
>       request.timeout.ms = 30000
>       ssl.provider = null
>       ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>       acks = all
>       batch.size = 16384
>       ssl.keystore.location = null
>       receive.buffer.bytes = 32768
>       ssl.cipher.suites = null
>       ssl.truststore.type = JKS
>       security.protocol = PLAINTEXT
>       retries = 0
>       max.request.size = 1048576
>       value.serializer = class 
> org.apache.kafka.common.serialization.ByteArraySerializer
>       ssl.truststore.location = null
>       ssl.keystore.password = null
>       ssl.keymanager.algorithm = SunX509
>       metrics.sample.window.ms = 30000
>       partitioner.class = class 
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>       send.buffer.bytes = 131072
>       linger.ms = 0
> 2016-10-21 01:15:51,742 (lifecycleSupervisor-1-6) [DEBUG - 
> org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added 
> sensor with name bufferpool-wait-time
> 2016-10-21 01:15:51,743 (lifecycleSupervisor-1-6) [DEBUG - 
> org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added 
> sensor with name buffer-exhausted-records
> 2016-10-21 01:15:51,744 (lifecycleSupervisor-1-6) [INFO - 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:613)]
>  Closing the Kafka producer with timeoutMillis = 0 ms.
> 2016-10-21 01:15:51,744 (lifecycleSupervisor-1-6) [DEBUG - 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:654)]
>  The Kafka producer has closed.
> 2016-10-21 01:15:51,744 (lifecycleSupervisor-1-6) [ERROR - 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)]
>  Unable to start org.apache.flume.channel.kafka.KafkaChannel{name: channel1} 
> - Exception follows.
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
>       at 
> org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:116)
>       at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.config.ConfigException: DNS resolution 
> failed for url in bootstrap.servers: a.b.c.d:6667
>       at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:269)
>       ... 10 more
> 2016-10-21 01:15:51,745 (conf-file-poller-0) [INFO - 
> org.apache.flume.node.Application.startAllComponents(Application.java:161)] 
> Waiting for channel: channel1 to start. Sleeping for 500 ms
> ##################INTO-ShutdownHook-NOW########################
> 2016-10-21 01:15:52,246 (conf-file-poller-0) [INFO - 
> org.apache.flume.node.Application.startAllComponents(Application.java:161)] 
> Waiting for channel: channel1 to start. Sleeping for 500 ms
> 2016-10-21 01:15:52,748 (conf-file-poller-0) [INFO - 
> org.apache.flume.node.Application.startAllComponents(Application.java:161)] 
> Waiting for channel: channel1 to start. Sleeping for 500 ms
> 2016-10-21 01:15:53,248 (conf-file-poller-0) [INFO - 
> org.apache.flume.node.Application.startAllComponents(Application.java:161)] 
> Waiting for channel: channel1 to start. Sleeping for 500 ms
> 2016-10-21 01:15:53,749 (conf-file-poller-0) [INFO - 
> org.apache.flume.node.Application.startAllComponents(Application.java:161)] 
> Waiting for channel: channel1 to start. Sleeping for 500 ms
> 2016-10-21 01:15:54,250 (conf-file-poller-0) [INFO - 
> org.apache.flume.node.Application.startAllComponents(Application.java:161)] 
> Waiting for channel: channel1 to start. Sleeping for 500 ms
> 2016-10-21 01:15:54,745 (lifecycleSupervisor-1-3) [INFO - 
> org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:115)] 
> Starting Kafka Channel: channel1
> 2016-10-21 01:15:54,747 (lifecycleSupervisor-1-3) [INFO - 
> org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:165)]
>  ProducerConfig values: 
>       compression.type = none
>       metric.reporters = []
>       metadata.max.age.ms = 300000
>       metadata.fetch.timeout.ms = 60000
>       reconnect.backoff.ms = 50
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       bootstrap.servers = [a.b.c.d:6667]
>       retry.backoff.ms = 100
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       buffer.memory = 33554432
>       timeout.ms = 30000
>       key.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       ssl.keystore.type = JKS
>       ssl.trustmanager.algorithm = PKIX
>       block.on.buffer.full = false
>       ssl.key.password = null
>       max.block.ms = 60000
>       sasl.kerberos.min.time.before.relogin = 60000
>       connections.max.idle.ms = 540000
>       ssl.truststore.password = null
>       max.in.flight.requests.per.connection = 5
>       metrics.num.samples = 2
>       client.id = 
>       ssl.endpoint.identification.algorithm = null
>       ssl.protocol = TLS
>       request.timeout.ms = 30000
>       ssl.provider = null
>       ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>       acks = all
>       batch.size = 16384
>       ssl.keystore.location = null
>       receive.buffer.bytes = 32768
>       ssl.cipher.suites = null
>       ssl.truststore.type = JKS
>       security.protocol = PLAINTEXT
>       retries = 0
>       max.request.size = 1048576
>       value.serializer = class 
> org.apache.kafka.common.serialization.ByteArraySerializer
>       ssl.truststore.location = null
>       ssl.keystore.password = null
>       ssl.keymanager.algorithm = SunX509
>       metrics.sample.window.ms = 30000
>       partitioner.class = class 
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>       send.buffer.bytes = 131072
>       linger.ms = 0
> 2016-10-21 01:15:54,747 (lifecycleSupervisor-1-3) [DEBUG - 
> org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added 
> sensor with name bufferpool-wait-time
> 2016-10-21 01:15:54,748 (lifecycleSupervisor-1-3) [DEBUG - 
> org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added 
> sensor with name buffer-exhausted-records
> 2016-10-21 01:15:54,787 (lifecycleSupervisor-1-3) [INFO - 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:613)]
>  Closing the Kafka producer with timeoutMillis = 0 ms.
> 2016-10-21 01:15:54,788 (lifecycleSupervisor-1-3) [DEBUG - 
> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:654)]
>  The Kafka producer has closed.
> 2016-10-21 01:15:54,788 (lifecycleSupervisor-1-3) [ERROR - 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)]
>  Unable to start org.apache.flume.channel.kafka.KafkaChannel{name: channel1} 
> - Exception follows.
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
>       at 
> org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:116)
>       at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.config.ConfigException: DNS resolution 
> failed for url in bootstrap.servers: a.b.c.d:6667
>       at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:269)
>       ... 10 more
> 2016-10-21 01:15:54,789 (conf-file-poller-0) [INFO - 
> org.apache.flume.node.Application.startAllComponents(Application.java:161)] 
> Waiting for channel: channel1 to start. Sleeping for 500 ms
> ......
> Issue "kill -15 <flume-pid>" to shutdown the process, it does not work. From 
> the log, the ShutdownHook was invoked but it does not do the cleanup. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to