Hello! I am trying to configure flume v1.8.0 to move messages from kafka source to oracle and hdfs sink and got trouble with configuration of kafka security connection.
I am using connection properties used for syslog-ng that is working well with kafka as config template: syslog-ng.properties # Authenticaton data for kafka producer security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="u1" password="u1"; So my starting flume configuration file is: cat conf/flume-kafka-mem-oracle.conf # Kafka to memory to Oracle Flume configuration # $Id$ # # Name the components on this agent a1.sources = kafkaSrc a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource a1.sources.kafkaSrc.kafka.bootstrap.servers = kfs1:9092,kfs2:9092,kfs3:9092 a1.sources.kafkaSrc.kafka.topics = acct0 a1.sources.kafkaSrc.kafka.consumer.group.id = g0 a1.sources.kafkaSrc.kafka.consumer.security.protocol = SASL_PLAINTEXT a1.sources.kafkaSrc.kafka.consumer.sasl.mechanism = PLAIN a1.sources.kafkaSrc.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="u0" password="u0" # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.kafkaSrc.channels = c1 a1.sinks.k1.channel = c1 With this config flume starts: bin/flume-ng agent --conf /opt/flume/conf --conf-file conf/flume-kafka-mem-oracle.conf --name a1 -Dflume.root.logger=INFO,console then display kafka consumer properties: ... 2017-12-21 16:39:02,382 (lifecycleSupervisor-1-4) [INFO - org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:165)] ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 value.deserializer = class org.apache.kafka.common.serialization. ByteArrayDeserializer group.id = g0 partition.assignment.strategy = [org.apache.kafka.clients. consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kfs1:9092, kfs2:9092, kfs3:9092] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = false ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null session.timeout.ms = 30000 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization. StringDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = SASL_PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = latest ... and then fails to build connection to kafka with error message: 2017-12-21 16:39:02,414 (lifecycleSupervisor-1-4) [ERROR - org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:86)] Unexpected error performing start: name = kafkaSrc org.apache.kafka.common.KafkaException: Failed to construct kafka consumer ... Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode. ... It seems to me, flume ignores parameter: a1.sources.kafkaSrc.kafka.consumer.sasl.mechanism = PLAIN and always tries to use kerberos security parameters because of messages from log: ... Caused by: java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in secure mode. at org.apache.kafka.common.security.kerberos.Login.login(Login.java:289) at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104) at org.apache.kafka.common.security.kerberos.LoginManager.<init>( LoginManager.java:44) at org.apache.kafka.common.security.kerberos.LoginManager. acquireLoginManager(LoginManager.java:85) at org.apache.kafka.common.network.SaslChannelBuilder. configure(SaslChannelBuilder.java:55) ... 16 more So my question for help, does flume support configuraton option sasl.mechanism = PLAIN for kafka connection? --- Igor Storozhev.
