Hello! I am trying to configure flume v1.8.0 to move messages from kafka
source to oracle and hdfs sink and got trouble with configuration of kafka
security connection.

I am using connection properties used for syslog-ng that is working well
with kafka as config template:

syslog-ng.properties
# Authenticaton data for kafka producer
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required  username="u1" password="u1";

So my starting flume configuration file is:

cat conf/flume-kafka-mem-oracle.conf
# Kafka to memory to Oracle Flume configuration
# $Id$
#
# Name the components on this agent
a1.sources = kafkaSrc
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafkaSrc.kafka.bootstrap.servers = kfs1:9092,kfs2:9092,kfs3:9092
a1.sources.kafkaSrc.kafka.topics = acct0
a1.sources.kafkaSrc.kafka.consumer.group.id = g0
a1.sources.kafkaSrc.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.kafkaSrc.kafka.consumer.sasl.mechanism = PLAIN
a1.sources.kafkaSrc.kafka.consumer.sasl.jaas.config =
org.apache.kafka.common.security.plain.PlainLoginModule required
username="u0" password="u0"

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.kafkaSrc.channels = c1
a1.sinks.k1.channel = c1

With this config flume starts:

bin/flume-ng agent --conf /opt/flume/conf --conf-file
conf/flume-kafka-mem-oracle.conf --name a1 -Dflume.root.logger=INFO,console

then display kafka consumer properties:
...
2017-12-21 16:39:02,382 (lifecycleSupervisor-1-4) [INFO -
org.apache.kafka.common.config.AbstractConfig.logAll(AbstractConfig.java:165)]
ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.
ByteArrayDeserializer
group.id = g0
partition.assignment.strategy = [org.apache.kafka.clients.
consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kfs1:9092, kfs2:9092, kfs3:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.
StringDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = SASL_PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = latest
...
 and then fails to build connection to kafka with error message:

2017-12-21 16:39:02,414 (lifecycleSupervisor-1-4) [ERROR -
org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:86)]
Unexpected error performing start: name = kafkaSrc
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
...
Caused by: org.apache.kafka.common.KafkaException:
java.lang.IllegalArgumentException:
You must pass java.security.auth.login.config in secure mode.
...

It seems to me, flume ignores parameter:

a1.sources.kafkaSrc.kafka.consumer.sasl.mechanism = PLAIN

and always tries to use kerberos security parameters because of messages
from log:
...
Caused by: java.lang.IllegalArgumentException: You must pass
java.security.auth.login.config in secure mode.
at org.apache.kafka.common.security.kerberos.Login.login(Login.java:289)
at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
at org.apache.kafka.common.security.kerberos.LoginManager.<init>(
LoginManager.java:44)
at org.apache.kafka.common.security.kerberos.LoginManager.
acquireLoginManager(LoginManager.java:85)
at org.apache.kafka.common.network.SaslChannelBuilder.
configure(SaslChannelBuilder.java:55)
... 16 more

So my question for help, does flume support configuraton option
sasl.mechanism = PLAIN for kafka connection?
---
Igor Storozhev.

Reply via email to