Hey David , My consumers are registered , here is the debug log. The problem is the broker does not belong to me , so I can’t see what is going on there . But this is a new consumer group , so there is no state yet .
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 40 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] On Thu, Aug 29, 2019 at 11:39 AM David Morin <morin.david....@gmail.com> wrote: > Hello Vishwas, > > You can use a keytab if you prefer. You generate a keytab for your user > and then you can reference it in the Flink configuration. > Then this keytab will be handled by Flink in a secure way and TGT will be > created based on this keytab. > However, that seems to be working. > Did you check Kafka logs on the broker side ? > Or did you check consumer offsets with Kafka tools in order to validate > consumers are registered onto the different partitions of your topic ? > You could try to switch to a different groupid for your consumer group in > order to force parallel consumption. > > Le jeu. 29 août 2019 à 09:57, Vishwas Siravara <vsirav...@gmail.com> a > écrit : > >> I see this log as well , but I can't see any messages . I know for a fact >> that the topic I am subscribed to has messages as I checked with a simple >> java consumer with a different group. >> >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - >> Consumer subtask 0 will start reading the following 40 partitions from the >> committed group offsets in Kafka: >> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, >> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}] >> >> >> On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara <vsirav...@gmail.com> >> wrote: >> >>> Hi guys, >>> I am using kerberos for my kafka source. I pass the jaas config and >>> krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf >>> -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/ >>> -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf >>> -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf >>> >>> When I look at debug logs I see that the consumer was created with the >>> following properties. >>> >>> 2019-08-29 06:49:18,298 INFO >>> org.apache.kafka.clients.consumer.ConsumerConfig - >>> ConsumerConfig values: >>> auto.commit.interval.ms = 5000 >>> auto.offset.reset = latest >>> bootstrap.servers = [sl73oprdbd018.visa.com:9092] >>> check.crcs = true >>> client.id = consumer-2 >>> connections.max.idle.ms = 540000 >>> enable.auto.commit = true >>> exclude.internal.topics = true >>> fetch.max.bytes = 52428800 >>> fetch.max.wait.ms = 500 >>> fetch.min.bytes = 1 >>> >>> >>> group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) >>> heartbeat.interval.ms = 3000 >>> interceptor.classes = null >>> key.deserializer = class >>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>> max.partition.fetch.bytes = 1048576 >>> max.poll.interval.ms = 300000 >>> max.poll.records = 500 >>> metadata.max.age.ms = 300000 >>> metric.reporters = [] >>> metrics.num.samples = 2 >>> metrics.recording.level = INFO >>> metrics.sample.window.ms = 30000 >>> partition.assignment.strategy = [class >>> org.apache.kafka.clients.consumer.RangeAssignor] >>> receive.buffer.bytes = 65536 >>> reconnect.backoff.ms = 50 >>> request.timeout.ms = 305000 >>> retry.backoff.ms = 100 >>> sasl.jaas.config = null >>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> sasl.kerberos.min.time.before.relogin = 60000 >>> sasl.kerberos.service.name = null >>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> sasl.mechanism = GSSAPI >>> security.protocol = SASL_PLAINTEXT >>> send.buffer.bytes = 131072 >>> session.timeout.ms = 10000 >>> ssl.cipher.suites = null >>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> ssl.endpoint.identification.algorithm = null >>> ssl.key.password = null >>> ssl.keymanager.algorithm = SunX509 >>> ssl.keystore.location = null >>> ssl.keystore.password = null >>> ssl.keystore.type = JKS >>> ssl.protocol = TLS >>> ssl.provider = null >>> ssl.secure.random.implementation = null >>> ssl.trustmanager.algorithm = PKIX >>> ssl.truststore.location = null >>> ssl.truststore.password = null >>> ssl.truststore.type = JKS >>> value.deserializer = class >>> org.apache.kafka.common.serialization.ByteArrayDeserializer >>> >>> >>> I can also see that the kerberos login is working fine. Here is the log for >>> it: >>> >>> >>> >>> 2019-08-29 06:49:18,312 INFO >>> org.apache.kafka.common.security.authenticator.AbstractLogin - >>> Successfully logged in. >>> 2019-08-29 06:49:18,313 INFO >>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT refresh >>> thread started. >>> 2019-08-29 06:49:18,314 INFO >>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT valid >>> starting at: Thu Aug 29 06:49:18 GMT 2019 >>> 2019-08-29 06:49:18,314 INFO >>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT expires: Thu >>> Aug 29 16:49:18 GMT 2019 >>> 2019-08-29 06:49:18,315 INFO >>> org.apache.kafka.common.security.kerberos.KerberosLogin - >>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT refresh >>> sleeping until: Thu Aug 29 15:00:10 GMT 2019 >>> 2019-08-29 06:49:18,316 WARN >>> org.apache.kafka.clients.consumer.ConsumerConfig - The >>> configuration 'zookeeper.connect' was supplied but isn't a known config. >>> 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser >>> - Kafka version : 0.10.2.0 >>> 2019-08-29 06:49:18,316 INFO org.apache.kafka.common.utils.AppInfoParser >>> - Kafka commitId : 576d93a8dc0cf421 >>> >>> >>> I then see this log : >>> >>> INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - >>> Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: >>> null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c) >>> >>> >>> >>> *The problem is I do not see any error log but there is no data being >>> processed by the consmer and it has been a nightmare to debug. * >>> >>> >>> Thanks for all the help . >>> >>> >>> Thanks,Vishwas >>> >>>