Is kafka support dynamic ACL rule
Hi As I learned that kafka can use '--resource-pattern-type prefixed' to add rule for prefixed topic. For example an user 'kafkaclient', we could define a rule let the user can access all topics start with that user name, i.e., 'kafkaclient--', such as 'kafkaclient--topic1', 'kafkaclient--topic2', etc. /opt/kafka/bin/kafka-acls.sh \ --authorizer-properties zookeeper.connect=zookeeper:2181 \ --add \ --allow-principal User:"kafkaclient" \ --operation All \ --resource-pattern-type prefixed \ --topic "kafkaclient--" \ But is it possible to define dynamic user name ? In above case we know the username is 'kafkaclient', and if there are many other users, we have to add rule for each user; these rules are similar, except the user name. So i want to know if it's possible to just define a single rule, using dynamic user name, each user could access the topics start with itself username. something likes: /opt/kafka/bin/kafka-acls.sh \ --authorizer-properties zookeeper.connect=zookeeper:2181 \ --add \ --allow-principal User:"**" \ --operation All \ --resource-pattern-type prefixed \ --topic "**--" \ Then whatever to add user or add topic later, we don't need to add any rules. Thanks. Hui
Re: Kafka tests on a remote cluster
Thanks fair points. Probably best if I simplify the question: How does Kafka community run tests besides using mocked local Kafka components? Surely there are tests to confirm different failure scenarios such as losing a broker in a real clustered environment (multi node cluster with Ip, port, hostnsmes and etc). The answer would be a good starting point for me. On Wed, Dec 26, 2018 at 6:11 PM Stephen Powis wrote: > Without looking into how the integration tests work my best guess is within > the context they were written to run in, it doesn't make sense to run them > against a remote cluster. The "internal" cluster is running the same code, > so why require having to coordinate with an external dependency? > > For the use case you gave, and I'm not sure if tests exist that cover this > behavior or not -- Running the brokers locally in the context of the tests > mean that those tests have control over the brokers (IE shut them off, > restart them, etc.. programmatically) and validate behavior. To coordinate > these operations on a remote broker would be significantly more difficult. > > Not sure this helps...but perhaps you're either asking the wrong questions > or trying to go about solving your problem using the wrong set of tools? > My gut feeling says if you want to do a full scale multi-server load / HA > test, Kafka's test suite is not the best place to start. > > Stephen > > > > On Thu, Dec 27, 2018 at 10:53 AM Parviz deyhim wrote: > > > Hi, > > > > I'm looking to see who has done this before and get some guidance. On > > frequent basis I like to run basic tests on a remote Kafka cluster while > > some random chaos/faults are being performed. In other words I like to > run > > chaos engineering tasks (network outage, disk outage, etc) and see how > > Kafka behaves. For example: > > > > 1) bring some random Broker node down > > 2) send 2000 messages > > 3) consumes messages > > 4) confirm there's no data loss > > > > My questions: I'm pretty sure most of the scenarios I'm looking to test > > have been covered under Kafka's integration,unit and other existing > tests. > > What I cannot figure out is how to run those tests on a remote cluster > vs. > > a local one which the tests seems to run on. For example I like to run > the > > following command but the tests to be executed on a remote cluster of my > > choice: > > > > ./gradlew cleanTest integrationTest > > > > Any guidance/help would be appreciated. > > > > Thanks > > >
SIGSEGV (0xb) on TransactionCoordinator
Dear all, We got a coredump with the following info last night, on this environment, we enable the transaction. Please kindly advice what would be the problem here. # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f546a857d0d, pid=13288, > tid=0x7f53701f9700 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_92-b14) (build > 1.8.0_92-b14) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode > linux-amd64 compressed oops) > # Problematic frame: > # J 9563 C1 > *kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(Lkafka/coordinator/transaction/TransactionCoordinator;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;Lkafka/coordinator/transaction/TransactionMetadata;)Lscala/util/Either; > (518 bytes) @ 0x7f546a857d0d [0x7f546a856b40+0x11cd]* > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # If you would like to submit a bug report, please visit: > # http://bugreport.java.com/bugreport/crash.jsp > # > --- T H R E A D --- > Current thread (0x7f547a29e800): JavaThread "kafka-request-handler-5" > daemon [_thread_in_Java, id=13722, > stack(0x7f53700f9000,0x7f53701fa000)] > siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: > 0xdd310c13 > Registers: > RAX=0x0001, RBX=0x0006e9072fc8, RCX=0x0688, > RDX=0x00075e026fc0 > RSP=0x7f53701f7f00, RBP=0x0006e98861f8, RSI=0x7f53771a4238, > RDI=0x0006e9886098 > R8 =0x132d, R9 =0xdd310c13, R10=0x0007c010bbb0, > R11=0xdd310c13 > R12=0x, R13=0xdd310b3d, R14=0xdd310c0c, > R15=0x7f547a29e800 > RIP=0x7f546a857d0d, EFLAGS=0x00010202, > CSGSFS=0x002b0033, ERR=0x0004 > TRAPNO=0x000e Thanks,
Re: Knowing when to grow a Kafka cluster
I really recommend the book "Kafka, the definitive guide" it's really useful for people running clusters, lots of good advice on tuning, metrics etc. Basically, you scale your cluster when you're hitting limits of your most important resources (to Kafka) on the broker nodes - CPU, network or disk. As identified by various OS and Kafka metrics - one that is recommended in the book I mentioned above is RequestHandlerAvgIdlePercent - the book states that if it drops below 20% it indicates potential problems, and if below 10% definite performance problems. In terms of whether to scale horizontally or vertically, that really depends on the costs involved in either option. Although if you're saturating the network interface on a node, you can't really scale that one vertically. Kind regards, Liam Clarke On 27 Dec. 2018 1:24 pm, "Harper Henn" wrote: Hi, Many articles exist about running Kafka at scale, but there are fewer resources for learning when to grow your cluster (e.g. adding a new broker or upgrading the computer it's running on). At first, the answer to that seems straightforward - you add a broker if you want to reduce the amount of network I/O, CPU utilization, etc. a broker experiences. But when and how do you know a brokers are taxed too heavily and it's time to add a new one? Any thoughts about scaling by adding brokers vs. scaling with more powerful hardware? Best, Harper
Re: Kafka tests on a remote cluster
Without looking into how the integration tests work my best guess is within the context they were written to run in, it doesn't make sense to run them against a remote cluster. The "internal" cluster is running the same code, so why require having to coordinate with an external dependency? For the use case you gave, and I'm not sure if tests exist that cover this behavior or not -- Running the brokers locally in the context of the tests mean that those tests have control over the brokers (IE shut them off, restart them, etc.. programmatically) and validate behavior. To coordinate these operations on a remote broker would be significantly more difficult. Not sure this helps...but perhaps you're either asking the wrong questions or trying to go about solving your problem using the wrong set of tools? My gut feeling says if you want to do a full scale multi-server load / HA test, Kafka's test suite is not the best place to start. Stephen On Thu, Dec 27, 2018 at 10:53 AM Parviz deyhim wrote: > Hi, > > I'm looking to see who has done this before and get some guidance. On > frequent basis I like to run basic tests on a remote Kafka cluster while > some random chaos/faults are being performed. In other words I like to run > chaos engineering tasks (network outage, disk outage, etc) and see how > Kafka behaves. For example: > > 1) bring some random Broker node down > 2) send 2000 messages > 3) consumes messages > 4) confirm there's no data loss > > My questions: I'm pretty sure most of the scenarios I'm looking to test > have been covered under Kafka's integration,unit and other existing tests. > What I cannot figure out is how to run those tests on a remote cluster vs. > a local one which the tests seems to run on. For example I like to run the > following command but the tests to be executed on a remote cluster of my > choice: > > ./gradlew cleanTest integrationTest > > Any guidance/help would be appreciated. > > Thanks >
Kafka tests on a remote cluster
Hi, I'm looking to see who has done this before and get some guidance. On frequent basis I like to run basic tests on a remote Kafka cluster while some random chaos/faults are being performed. In other words I like to run chaos engineering tasks (network outage, disk outage, etc) and see how Kafka behaves. For example: 1) bring some random Broker node down 2) send 2000 messages 3) consumes messages 4) confirm there's no data loss My questions: I'm pretty sure most of the scenarios I'm looking to test have been covered under Kafka's integration,unit and other existing tests. What I cannot figure out is how to run those tests on a remote cluster vs. a local one which the tests seems to run on. For example I like to run the following command but the tests to be executed on a remote cluster of my choice: ./gradlew cleanTest integrationTest Any guidance/help would be appreciated. Thanks
Knowing when to grow a Kafka cluster
Hi, Many articles exist about running Kafka at scale, but there are fewer resources for learning when to grow your cluster (e.g. adding a new broker or upgrading the computer it's running on). At first, the answer to that seems straightforward - you add a broker if you want to reduce the amount of network I/O, CPU utilization, etc. a broker experiences. But when and how do you know a brokers are taxed too heavily and it's time to add a new one? Any thoughts about scaling by adding brokers vs. scaling with more powerful hardware? Best, Harper
Re: Cannot read messages using a specific group
Hi all, I set the log level to INFO and here are the logs. I’m not sure why those logs say that the Group coordinator is unavailable or invalid because the brokers are all up (including the ones with the ip specified in the log) and just changing the consumer group makes it work. If anybody had an idea, that would be very helpful. (also committing the offsets to Zookeeper works, so I'm wondering if there is a bug with __Consummer_Offset) Thank you for your help kafka-console-consumer --bootstrap-server ip1:9092,ip2:9092 --topic ASSETS --group ASSETS_PROCESSOR_TOPOLOGY [2018-12-26 15:47:15,243] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2018-12-26 15:47:15,387] INFO ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [ip1:9092, ip2:9092] check.crcs = true client.id = connections.max.idle.ms = 54 default.api.timeout.ms = 6 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = ASSETS_PROCESSOR_TOPOLOGY heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 500 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 3 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig) [2018-12-26 15:47:15,579] INFO Kafka version : 2.0.1-cp1 (org.apache.kafka.common.utils.AppInfoParser) [2018-12-26 15:47:15,579] INFO Kafka commitId : 3d167ab3fdad2e73 (org.apache.kafka.common.utils.AppInfoParser) [2018-12-26 15:47:16,711] INFO Cluster ID: AkXF_gdZS5eVEEYbvUiyRg (org.apache.kafka.clients.Metadata) [2018-12-26 15:47:16,712] INFO [Consumer clientId=consumer-1, groupId=ASSETS_PROCESSOR_TOPOLOGY] Discovered group coordinator ip_address1:9092 (id: 2147483643 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-12-26 15:47:16,713] INFO [Consumer clientId=consumer-1, groupId=ASSETS_PROCESSOR_TOPOLOGY] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-12-26 15:47:16,714] INFO [Consumer clientId=consumer-1, groupId=ASSETS_PROCESSOR_TOPOLOGY] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-12-26 15:47:17,351] INFO [Consumer clientId=consumer-1, groupId=ASSETS_PROCESSOR_TOPOLOGY] Group coordinator ip_address1:9092 (id: 2147483643 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-12-26 15:47:18,189] INFO [Consumer clientId=consumer-1, groupId=ASSETS_PROCESSOR_TOPOLOGY] Discovered group coordinator ip_address2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-12-26 15:47:18,190] INFO [Consumer clientId=consumer-1, groupId=ASSETS_PROCESSOR_TOPOLOGY] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2018-12-26 15:47:19,501] INFO [Consumer clientId=consumer-1, groupId=ASSETS_PROCESSOR_TOPOLOGY] Group coordi
kafka-acls.sh --list failed when zookeeper SASL/PLAIN authentication is enabled
I have a kakfa/zookeeper(embedded zookeeper) cluster with SASL/PLAIN + ACL enabled. It worked fine with version kafka_2.12-1.0.0. But recently, I need to upgrade to kafka_2.12-2.1.0. Unfortunately, the ACL function cannot work normally. kafka-acls.sh command failed, for example: > # echo $KAFKA_OPTS > > KAFKA_OPTS=-Djava.security.auth.login.config=/work/sasl/kafka_server_jaas.conf > # # /kafka_2.12-2.1.0/bin/kafka-acls.sh --authorizer > kafka.security.auth.SimpleAclAuthorizer --authorizer-properties > zookeeper.connect=zookeeper.example.com:2181 --list --topic test-topic The error message from screen is: > Error while executing ACL command: KeeperErrorCode = InvalidACL for > /kafka-acl > org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode > = InvalidACL for /kafka-acl > at org.apache.zookeeper.KeeperException.create(KeeperException.java:121) > at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) > at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:494) > at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1416) > at kafka.zk.KafkaZkClient.createAclPaths(KafkaZkClient.scala:931) > at > kafka.security.auth.SimpleAclAuthorizer.configure(SimpleAclAuthorizer.scala:96) > at kafka.admin.AclCommand$.withAuthorizer(AclCommand.scala:78) > at kafka.admin.AclCommand$.listAcl(AclCommand.scala:119) > at kafka.admin.AclCommand$.main(AclCommand.scala:56) > at kafka.admin.AclCommand.main(AclCommand.scala) > The zookeeper log is: > zookeeper.example.com| [2018-12-26 09:46:09,622] ERROR Missing > AuthenticationProvider for sasl > (org.apache.zookeeper.server.PrepRequestProcessor) > zookeeper.example.com| [2018-12-26 09:46:09,622] INFO Got user-level > KeeperException when processing sessionid:0x167e9e2c60c0003 type:create > cxid:0x3 zxid:0x1008a txntype:-1 reqpath:n/a Error Path:/kafka-acl > Error:KeeperErrorCode = InvalidACL for /kafka-acl > (org.apache.zookeeper.server.PrepRequestProcessor) > zookeeper.example.com| [2018-12-26 09:46:09,704] INFO Processed > session termination for sessionid: 0x167e9e2c60c0003 > (org.apache.zookeeper.server.PrepRequestProcessor) > The kafka SASL configure file /work/sasl/kafka_server_jaas.conf content is: > # cat /work/sasl/kafka_server_jaas.conf > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="adminpwd" > user_admin="adminpwd" > user_alice="alicepwd"; > }; > > KafkaClient { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="alice" > password="alicepwd"; > }; > > Client { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="adminpwd"; > }; > And zookeeper SASL configure file zookeeper_jaas.conf content is: > # cat /work/sasl/zookeeper_jaas.conf > Server { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="adminpwd" > user_admin="adminpwd"; > }; > Anybody can help this ? thanks. Hui
?????? KTable.suppress(Suppressed.untilWindowCloses) does not suppresssome non-final results when the kafka streams process is restarted
I'm not talking about orderliness, but that the same consumer group, the same partition, is consumed by multiple consumers. I use kafka-consumer-groups.sh and org.apache.kafka.clients.admin.AdminClient to validate the results. Because the same consumer group subscribes to a topic, the same partition cannot be consumed by multiple consumers. But that happened. -- -- ??: "Peter Levart"; : 2018??12??26??(??) 5:21 ??: "users";"Guozhang Wang"; : Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppresssome non-final results when the kafka streams process is restarted On 12/21/18 3:16 PM, Peter Levart wrote: > I also see some results that are actual non-final window aggregations > that precede the final aggregations. These non-final results are never > emitted out of order (for example, no such non-final result would ever > come after the final result for a particular key/window). Absence of proof is not the proof of absence... And I have later observed (using the DSL variant, not the custom Transformer) an occurrence of a non-final result that was emited after restart of streams processor while the final result for the same key/window had been emitted before the restart: [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856 ... ... restart ... ... [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550] INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648 The app logic can not even rely on guarantee that results are ordered then. This is really not usable until the bug is fixed. Regards, Peter
Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
On 12/21/18 3:16 PM, Peter Levart wrote: I also see some results that are actual non-final window aggregations that precede the final aggregations. These non-final results are never emitted out of order (for example, no such non-final result would ever come after the final result for a particular key/window). Absence of proof is not the proof of absence... And I have later observed (using the DSL variant, not the custom Transformer) an occurrence of a non-final result that was emited after restart of streams processor while the final result for the same key/window had been emitted before the restart: [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856 ... ... restart ... ... [pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550] INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648 The app logic can not even rely on guarantee that results are ordered then. This is really not usable until the bug is fixed. Regards, Peter
The problem of kafka consumer group
Dear Kafka Team : I encountered some problems in using Kafka2.0.0, In the same consumer group of Kafka cluster, Two consumers consume the same partition, Perhaps the picture in the attachment is more illustrative. Everything works well with Range Assignor, and this problem may arise later with Sticky Assignor. Although the probability is not high, it will also affect the use of kafka.I'd like to know if this is normal or bug. I hope you can answer it. Thank you.