[ https://issues.apache.org/jira/browse/FLUME-3012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15594537#comment-15594537 ]
Ping Wang commented on FLUME-3012: ---------------------------------- I found flume-ng-node/src/main/java/org/apache/flume/node/Application.java already has Shutdown Hook to handle SIGTERM: final Application appReference = application; Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { @Override public void run() { appReference.stop(); } }); In the scenario, addShutdownHook was called but stop operation was not done. > Sending a term signal can not shutdown Flume agent when KafkaChannel starting > has exceptions > -------------------------------------------------------------------------------------------- > > Key: FLUME-3012 > URL: https://issues.apache.org/jira/browse/FLUME-3012 > Project: Flume > Issue Type: Bug > Components: Kafka Channel > Affects Versions: v1.6.0 > Environment: Flume 1.6.0+Kafka 0.9 > Reporter: Ping Wang > Fix For: v1.8.0 > > Attachments: threaddumps.log > > > Use Kafka Channel in the agent configuration: > tier1.sources = source1 > tier1.channels = channel1 > tier1.sinks = sink1 > tier1.sources.source1.type = exec > tier1.sources.source1.command = /usr/bin/vmstat 1 > tier1.sources.source1.channels = channel1 > tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel > tier1.channels.channel1.kafka.bootstrap.servers = a.b.c.d:6667 > tier1.sinks.sink1.type = hdfs > tier1.sinks.sink1.hdfs.path = /tmp/kafka/channel > tier1.sinks.sink1.hdfs.rollInterval = 5 > tier1.sinks.sink1.hdfs.rollSize = 0 > tier1.sinks.sink1.hdfs.rollCount = 0 > tier1.sinks.sink1.hdfs.fileType = DataStream > tier1.sinks.sink1.channel = channel1 > Accidentally kaka.bootstrap.servers is not correct, errors will be thrown > out: > ...... > )] Waiting for channel: channel1 to start. Sleeping for 500 ms > 2016-10-21 01:15:50,739 (conf-file-poller-0) [INFO - > org.apache.flume.node.Application.startAllComponents(Application.java:161)] > Waiting for channel: channel1 to start. Sleeping for 500 ms > 2016-10-21 01:15:51,240 (conf-file-poller-0) [INFO - > org.apache.flume.node.Application.startAllComponents(Application.java:161)] > Waiting for channel: channel1 to start. Sleeping for 500 ms > 2016-10-21 01:15:51,735 (lifecycleSupervisor-1-6) [INFO - > org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:115)] > Starting Kafka Channel: channel1 > 2016-10-21 01:15:51,737 (lifecycleSupervisor-1-6) [INFO - > org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:165)] > ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > bootstrap.servers = [a.b.c.d:6667] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > block.on.buffer.full = false > ssl.key.password = null > max.block.ms = 60000 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > ssl.protocol = TLS > request.timeout.ms = 30000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > acks = all > batch.size = 16384 > ssl.keystore.location = null > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > retries = 0 > max.request.size = 1048576 > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > send.buffer.bytes = 131072 > linger.ms = 0 > 2016-10-21 01:15:51,742 (lifecycleSupervisor-1-6) [DEBUG - > org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added > sensor with name bufferpool-wait-time > 2016-10-21 01:15:51,743 (lifecycleSupervisor-1-6) [DEBUG - > org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added > sensor with name buffer-exhausted-records > 2016-10-21 01:15:51,744 (lifecycleSupervisor-1-6) [INFO - > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:613)] > Closing the Kafka producer with timeoutMillis = 0 ms. > 2016-10-21 01:15:51,744 (lifecycleSupervisor-1-6) [DEBUG - > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:654)] > The Kafka producer has closed. > 2016-10-21 01:15:51,744 (lifecycleSupervisor-1-6) [ERROR - > org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] > Unable to start org.apache.flume.channel.kafka.KafkaChannel{name: channel1} > - Exception follows. > org.apache.kafka.common.KafkaException: Failed to construct kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181) > at > org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:116) > at > org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.config.ConfigException: DNS resolution > failed for url in bootstrap.servers: a.b.c.d:6667 > at > org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:269) > ... 10 more > 2016-10-21 01:15:51,745 (conf-file-poller-0) [INFO - > org.apache.flume.node.Application.startAllComponents(Application.java:161)] > Waiting for channel: channel1 to start. Sleeping for 500 ms > ##################INTO-ShutdownHook-NOW######################## > 2016-10-21 01:15:52,246 (conf-file-poller-0) [INFO - > org.apache.flume.node.Application.startAllComponents(Application.java:161)] > Waiting for channel: channel1 to start. Sleeping for 500 ms > 2016-10-21 01:15:52,748 (conf-file-poller-0) [INFO - > org.apache.flume.node.Application.startAllComponents(Application.java:161)] > Waiting for channel: channel1 to start. Sleeping for 500 ms > 2016-10-21 01:15:53,248 (conf-file-poller-0) [INFO - > org.apache.flume.node.Application.startAllComponents(Application.java:161)] > Waiting for channel: channel1 to start. Sleeping for 500 ms > 2016-10-21 01:15:53,749 (conf-file-poller-0) [INFO - > org.apache.flume.node.Application.startAllComponents(Application.java:161)] > Waiting for channel: channel1 to start. Sleeping for 500 ms > 2016-10-21 01:15:54,250 (conf-file-poller-0) [INFO - > org.apache.flume.node.Application.startAllComponents(Application.java:161)] > Waiting for channel: channel1 to start. Sleeping for 500 ms > 2016-10-21 01:15:54,745 (lifecycleSupervisor-1-3) [INFO - > org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:115)] > Starting Kafka Channel: channel1 > 2016-10-21 01:15:54,747 (lifecycleSupervisor-1-3) [INFO - > org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:165)] > ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > bootstrap.servers = [a.b.c.d:6667] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > block.on.buffer.full = false > ssl.key.password = null > max.block.ms = 60000 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > client.id = > ssl.endpoint.identification.algorithm = null > ssl.protocol = TLS > request.timeout.ms = 30000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > acks = all > batch.size = 16384 > ssl.keystore.location = null > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > retries = 0 > max.request.size = 1048576 > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > send.buffer.bytes = 131072 > linger.ms = 0 > 2016-10-21 01:15:54,747 (lifecycleSupervisor-1-3) [DEBUG - > org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added > sensor with name bufferpool-wait-time > 2016-10-21 01:15:54,748 (lifecycleSupervisor-1-3) [DEBUG - > org.apache.kafka.common.metrics.Metrics.sensor(Metrics.java:201)] Added > sensor with name buffer-exhausted-records > 2016-10-21 01:15:54,787 (lifecycleSupervisor-1-3) [INFO - > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:613)] > Closing the Kafka producer with timeoutMillis = 0 ms. > 2016-10-21 01:15:54,788 (lifecycleSupervisor-1-3) [DEBUG - > org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:654)] > The Kafka producer has closed. > 2016-10-21 01:15:54,788 (lifecycleSupervisor-1-3) [ERROR - > org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] > Unable to start org.apache.flume.channel.kafka.KafkaChannel{name: channel1} > - Exception follows. > org.apache.kafka.common.KafkaException: Failed to construct kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181) > at > org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:116) > at > org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.kafka.common.config.ConfigException: DNS resolution > failed for url in bootstrap.servers: a.b.c.d:6667 > at > org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:269) > ... 10 more > 2016-10-21 01:15:54,789 (conf-file-poller-0) [INFO - > org.apache.flume.node.Application.startAllComponents(Application.java:161)] > Waiting for channel: channel1 to start. Sleeping for 500 ms > ...... > Issue "kill -15 <flume-pid>" to shutdown the process, it does not work. From > the log, the ShutdownHook was invoked but it does not do the cleanup. -- This message was sent by Atlassian JIRA (v6.3.4#6332)