Is kafka support dynamic ACL rule

2018-12-26 Thread hui happy
Hi

As I learned that kafka can use  '--resource-pattern-type prefixed'  to add
rule for prefixed topic.
For example an user 'kafkaclient', we could define a rule let the user can
access all topics start with that user name, i.e., 'kafkaclient--', such
as  'kafkaclient--topic1', 'kafkaclient--topic2', etc.

/opt/kafka/bin/kafka-acls.sh \

  --authorizer-properties zookeeper.connect=zookeeper:2181 \

  --add \

  --allow-principal User:"kafkaclient" \

  --operation All \

  --resource-pattern-type prefixed \

  --topic "kafkaclient--" \


But is it possible to define dynamic user name ?
In above case we know the username is 'kafkaclient', and if there are many
other users, we have to add rule for each user; these rules are similar,
except the user name.

So i want to know if it's possible to just define a single rule, using
dynamic user name, each user could access the topics start with itself
username. something likes:

/opt/kafka/bin/kafka-acls.sh \

  --authorizer-properties zookeeper.connect=zookeeper:2181 \

  --add \

  --allow-principal User:"**" \

  --operation All \

  --resource-pattern-type prefixed \

  --topic "**--" \


Then whatever to add user or add topic later, we don't need to add any
rules.

Thanks.
Hui


Re: Kafka tests on a remote cluster

2018-12-26 Thread Parviz deyhim
Thanks fair points. Probably best if I simplify the question: How does
Kafka community run tests besides using mocked local Kafka components?
Surely there are tests to confirm different failure scenarios such as
losing a broker in a real clustered environment (multi node cluster with
Ip, port, hostnsmes and etc). The answer would be a good starting point for
me.

On Wed, Dec 26, 2018 at 6:11 PM Stephen Powis  wrote:

> Without looking into how the integration tests work my best guess is within
> the context they were written to run in, it doesn't make sense to run them
> against a remote cluster.  The "internal" cluster is running the same code,
> so why require having to coordinate with an external dependency?
>
> For the use case you gave, and I'm not sure if tests exist that cover this
> behavior or not -- Running the brokers locally in the context of the tests
> mean that those tests have control over the brokers (IE shut them off,
> restart them, etc.. programmatically) and validate behavior.  To coordinate
> these operations on a remote broker would be significantly more difficult.
>
> Not sure this helps...but perhaps you're either asking the wrong questions
> or trying to go about solving your problem using the wrong set of tools?
> My gut feeling says if you want to do a full scale multi-server load / HA
> test, Kafka's test suite is not the best place to start.
>
> Stephen
>
>
>
> On Thu, Dec 27, 2018 at 10:53 AM Parviz deyhim  wrote:
>
> > Hi,
> >
> > I'm looking to see who has done this before and get some guidance. On
> > frequent basis I like to run basic tests on a remote Kafka cluster while
> > some random chaos/faults are being performed. In other words I like to
> run
> > chaos engineering tasks (network outage, disk outage, etc) and see how
> > Kafka behaves. For example:
> >
> > 1) bring some random Broker node down
> > 2) send 2000 messages
> > 3) consumes messages
> > 4) confirm there's no data loss
> >
> > My questions: I'm pretty sure most of the scenarios I'm looking to test
> > have been covered under Kafka's integration,unit and other existing
> tests.
> > What I cannot figure out is how to run those tests on a remote cluster
> vs.
> > a local one which the tests seems to run on. For example I like to run
> the
> > following command but the tests to be executed on a remote cluster of my
> > choice:
> >
> > ./gradlew cleanTest integrationTest
> >
> > Any guidance/help would be appreciated.
> >
> > Thanks
> >
>


SIGSEGV (0xb) on TransactionCoordinator

2018-12-26 Thread wenxing zheng
Dear all,

We got a coredump with the following info last night, on this environment,
we enable the
transaction. Please kindly advice what would be the problem here.

#
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f546a857d0d, pid=13288,
> tid=0x7f53701f9700
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_92-b14) (build
> 1.8.0_92-b14)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.92-b14 mixed mode
> linux-amd64 compressed oops)
> # Problematic frame:
> # J 9563 C1 
> *kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(Lkafka/coordinator/transaction/TransactionCoordinator;Ljava/lang/String;JSLorg/apache/kafka/common/requests/TransactionResult;Lkafka/coordinator/transaction/TransactionMetadata;)Lscala/util/Either;
> (518 bytes) @ 0x7f546a857d0d [0x7f546a856b40+0x11cd]*
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> #
> ---  T H R E A D  ---
> Current thread (0x7f547a29e800):  JavaThread "kafka-request-handler-5"
> daemon [_thread_in_Java, id=13722,
> stack(0x7f53700f9000,0x7f53701fa000)]
> siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr:
> 0xdd310c13
> Registers:
> RAX=0x0001, RBX=0x0006e9072fc8, RCX=0x0688,
> RDX=0x00075e026fc0
> RSP=0x7f53701f7f00, RBP=0x0006e98861f8, RSI=0x7f53771a4238,
> RDI=0x0006e9886098
> R8 =0x132d, R9 =0xdd310c13, R10=0x0007c010bbb0,
> R11=0xdd310c13
> R12=0x, R13=0xdd310b3d, R14=0xdd310c0c,
> R15=0x7f547a29e800
> RIP=0x7f546a857d0d, EFLAGS=0x00010202,
> CSGSFS=0x002b0033, ERR=0x0004
>   TRAPNO=0x000e


Thanks,


Re: Knowing when to grow a Kafka cluster

2018-12-26 Thread Liam Clarke
I really recommend the book "Kafka, the definitive guide" it's really
useful for people running clusters, lots of good advice on tuning, metrics
etc.

Basically, you scale your cluster when you're hitting limits of your most
important resources (to Kafka) on the broker nodes - CPU, network or disk.
As identified by various OS and Kafka metrics - one that is recommended in
the book I mentioned above is RequestHandlerAvgIdlePercent - the book
states that if it drops below 20% it indicates potential problems, and if
below 10% definite performance problems.

In terms of whether to scale horizontally or vertically, that really
depends on the costs involved in either option. Although if you're
saturating the network interface on a node, you can't really scale that one
vertically.

Kind regards,

Liam Clarke

On 27 Dec. 2018 1:24 pm, "Harper Henn"  wrote:

Hi,

Many articles exist about running Kafka at scale, but there are fewer
resources for learning when to grow your cluster (e.g. adding a new broker
or upgrading the computer it's running on).

At first, the answer to that seems straightforward - you add a broker if
you want to reduce the amount of network I/O, CPU utilization, etc. a
broker experiences. But when and how do you know a brokers are taxed too
heavily and it's time to add a new one? Any thoughts about scaling by
adding brokers vs. scaling with more powerful hardware?

Best,
Harper


Re: Kafka tests on a remote cluster

2018-12-26 Thread Stephen Powis
Without looking into how the integration tests work my best guess is within
the context they were written to run in, it doesn't make sense to run them
against a remote cluster.  The "internal" cluster is running the same code,
so why require having to coordinate with an external dependency?

For the use case you gave, and I'm not sure if tests exist that cover this
behavior or not -- Running the brokers locally in the context of the tests
mean that those tests have control over the brokers (IE shut them off,
restart them, etc.. programmatically) and validate behavior.  To coordinate
these operations on a remote broker would be significantly more difficult.

Not sure this helps...but perhaps you're either asking the wrong questions
or trying to go about solving your problem using the wrong set of tools?
My gut feeling says if you want to do a full scale multi-server load / HA
test, Kafka's test suite is not the best place to start.

Stephen



On Thu, Dec 27, 2018 at 10:53 AM Parviz deyhim  wrote:

> Hi,
>
> I'm looking to see who has done this before and get some guidance. On
> frequent basis I like to run basic tests on a remote Kafka cluster while
> some random chaos/faults are being performed. In other words I like to run
> chaos engineering tasks (network outage, disk outage, etc) and see how
> Kafka behaves. For example:
>
> 1) bring some random Broker node down
> 2) send 2000 messages
> 3) consumes messages
> 4) confirm there's no data loss
>
> My questions: I'm pretty sure most of the scenarios I'm looking to test
> have been covered under Kafka's integration,unit and other existing tests.
> What I cannot figure out is how to run those tests on a remote cluster vs.
> a local one which the tests seems to run on. For example I like to run the
> following command but the tests to be executed on a remote cluster of my
> choice:
>
> ./gradlew cleanTest integrationTest
>
> Any guidance/help would be appreciated.
>
> Thanks
>


Kafka tests on a remote cluster

2018-12-26 Thread Parviz deyhim
Hi,

I'm looking to see who has done this before and get some guidance. On
frequent basis I like to run basic tests on a remote Kafka cluster while
some random chaos/faults are being performed. In other words I like to run
chaos engineering tasks (network outage, disk outage, etc) and see how
Kafka behaves. For example:

1) bring some random Broker node down
2) send 2000 messages
3) consumes messages
4) confirm there's no data loss

My questions: I'm pretty sure most of the scenarios I'm looking to test
have been covered under Kafka's integration,unit and other existing tests.
What I cannot figure out is how to run those tests on a remote cluster vs.
a local one which the tests seems to run on. For example I like to run the
following command but the tests to be executed on a remote cluster of my
choice:

./gradlew cleanTest integrationTest

Any guidance/help would be appreciated.

Thanks


Knowing when to grow a Kafka cluster

2018-12-26 Thread Harper Henn
Hi,

Many articles exist about running Kafka at scale, but there are fewer
resources for learning when to grow your cluster (e.g. adding a new broker
or upgrading the computer it's running on).

At first, the answer to that seems straightforward - you add a broker if
you want to reduce the amount of network I/O, CPU utilization, etc. a
broker experiences. But when and how do you know a brokers are taxed too
heavily and it's time to add a new one? Any thoughts about scaling by
adding brokers vs. scaling with more powerful hardware?

Best,
Harper


Re: Cannot read messages using a specific group

2018-12-26 Thread Karim Lamouri
Hi all,

I set the log level to INFO and here are the logs.

I’m not sure why those logs say that the Group coordinator is unavailable
or invalid because the brokers are all up (including the ones with the ip
specified in the log) and just changing the consumer group makes it work.
If anybody had an idea, that would be very helpful. (also committing the
offsets to Zookeeper works, so I'm wondering if there is a bug with
__Consummer_Offset)

Thank you for your help

kafka-console-consumer --bootstrap-server ip1:9092,ip2:9092 --topic
ASSETS --group ASSETS_PROCESSOR_TOPOLOGY

[2018-12-26 15:47:15,243] INFO Registered
kafka:type=kafka.Log4jController MBean
(kafka.utils.Log4jControllerRegistration$)
[2018-12-26 15:47:15,387] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [ip1:9092, ip2:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 54
default.api.timeout.ms = 6
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = ASSETS_PROCESSOR_TOPOLOGY
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 3
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 1
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-12-26 15:47:15,579] INFO Kafka version : 2.0.1-cp1
(org.apache.kafka.common.utils.AppInfoParser)
[2018-12-26 15:47:15,579] INFO Kafka commitId : 3d167ab3fdad2e73
(org.apache.kafka.common.utils.AppInfoParser)
[2018-12-26 15:47:16,711] INFO Cluster ID: AkXF_gdZS5eVEEYbvUiyRg
(org.apache.kafka.clients.Metadata)
[2018-12-26 15:47:16,712] INFO [Consumer clientId=consumer-1,
groupId=ASSETS_PROCESSOR_TOPOLOGY] Discovered group coordinator
ip_address1:9092 (id: 2147483643 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-26 15:47:16,713] INFO [Consumer clientId=consumer-1,
groupId=ASSETS_PROCESSOR_TOPOLOGY] Revoking previously assigned
partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-12-26 15:47:16,714] INFO [Consumer clientId=consumer-1,
groupId=ASSETS_PROCESSOR_TOPOLOGY] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-26 15:47:17,351] INFO [Consumer clientId=consumer-1,
groupId=ASSETS_PROCESSOR_TOPOLOGY] Group coordinator ip_address1:9092
(id: 2147483643 rack: null) is unavailable or invalid, will attempt
rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-26 15:47:18,189] INFO [Consumer clientId=consumer-1,
groupId=ASSETS_PROCESSOR_TOPOLOGY] Discovered group coordinator
ip_address2:9092 (id: 2147483645 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-26 15:47:18,190] INFO [Consumer clientId=consumer-1,
groupId=ASSETS_PROCESSOR_TOPOLOGY] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-26 15:47:19,501] INFO [Consumer clientId=consumer-1,
groupId=ASSETS_PROCESSOR_TOPOLOGY] Group coordi

kafka-acls.sh --list failed when zookeeper SASL/PLAIN authentication is enabled

2018-12-26 Thread hui happy
I have a kakfa/zookeeper(embedded zookeeper) cluster with SASL/PLAIN + ACL
enabled.
It worked fine with version kafka_2.12-1.0.0. But recently, I need to
upgrade to kafka_2.12-2.1.0. Unfortunately, the ACL function cannot work
normally.

kafka-acls.sh command failed, for example:

> # echo $KAFKA_OPTS
>
> KAFKA_OPTS=-Djava.security.auth.login.config=/work/sasl/kafka_server_jaas.conf
>
#

# /kafka_2.12-2.1.0/bin/kafka-acls.sh --authorizer
> kafka.security.auth.SimpleAclAuthorizer --authorizer-properties
> zookeeper.connect=zookeeper.example.com:2181 --list --topic test-topic



The error message from screen is:

> Error while executing ACL command: KeeperErrorCode = InvalidACL for
> /kafka-acl
> org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode
> = InvalidACL for /kafka-acl
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:121)
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
> at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:494)
> at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1416)
> at kafka.zk.KafkaZkClient.createAclPaths(KafkaZkClient.scala:931)
> at
> kafka.security.auth.SimpleAclAuthorizer.configure(SimpleAclAuthorizer.scala:96)
> at kafka.admin.AclCommand$.withAuthorizer(AclCommand.scala:78)
> at kafka.admin.AclCommand$.listAcl(AclCommand.scala:119)
> at kafka.admin.AclCommand$.main(AclCommand.scala:56)
> at kafka.admin.AclCommand.main(AclCommand.scala)
>

The zookeeper log is:

> zookeeper.example.com| [2018-12-26 09:46:09,622] ERROR Missing
> AuthenticationProvider for sasl
> (org.apache.zookeeper.server.PrepRequestProcessor)
> zookeeper.example.com| [2018-12-26 09:46:09,622] INFO Got user-level
> KeeperException when processing sessionid:0x167e9e2c60c0003 type:create
> cxid:0x3 zxid:0x1008a txntype:-1 reqpath:n/a Error Path:/kafka-acl
> Error:KeeperErrorCode = InvalidACL for /kafka-acl
> (org.apache.zookeeper.server.PrepRequestProcessor)
> zookeeper.example.com| [2018-12-26 09:46:09,704] INFO Processed
> session termination for sessionid: 0x167e9e2c60c0003
> (org.apache.zookeeper.server.PrepRequestProcessor)
>

The  kafka SASL configure file /work/sasl/kafka_server_jaas.conf content is:

> # cat /work/sasl/kafka_server_jaas.conf
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="adminpwd"
> user_admin="adminpwd"
> user_alice="alicepwd";
> };
>
> KafkaClient {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="alice"
> password="alicepwd";
> };
>
> Client {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="adminpwd";
> };
>

And zookeeper SASL configure file zookeeper_jaas.conf content is:

> # cat /work/sasl/zookeeper_jaas.conf
> Server {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="adminpwd"
> user_admin="adminpwd";
> };
>


Anybody can help this ? thanks.
Hui


?????? KTable.suppress(Suppressed.untilWindowCloses) does not suppresssome non-final results when the kafka streams process is restarted

2018-12-26 Thread MCG
I'm not talking about orderliness, but that the same consumer group, the same 
partition, is consumed by multiple consumers. I use kafka-consumer-groups.sh 
and org.apache.kafka.clients.admin.AdminClient to validate the results. Because 
the same consumer group subscribes to a topic, the same partition cannot be 
consumed by multiple consumers. But that happened.




--  --
??: "Peter Levart";
: 2018??12??26??(??) 5:21
??: "users";"Guozhang Wang";

: Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppresssome 
non-final results when the kafka streams process is restarted





On 12/21/18 3:16 PM, Peter Levart wrote:
> I also see some results that are actual non-final window aggregations 
> that precede the final aggregations. These non-final results are never 
> emitted out of order (for example, no such non-final result would ever 
> come after the final result for a particular key/window). 

Absence of proof is not the proof of absence... And I have later 
observed (using the DSL variant, not the custom Transformer) an 
occurrence of a non-final result that was emited after restart of 
streams processor while the final result for the same key/window had 
been emitted before the restart:

[pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550, 
81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
...
... restart ...
...
[pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550] 
INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648


The app logic can not even rely on guarantee that results are ordered 
then. This is really not usable until the bug is fixed.

Regards, Peter

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-26 Thread Peter Levart




On 12/21/18 3:16 PM, Peter Levart wrote:
I also see some results that are actual non-final window aggregations 
that precede the final aggregations. These non-final results are never 
emitted out of order (for example, no such non-final result would ever 
come after the final result for a particular key/window). 


Absence of proof is not the proof of absence... And I have later 
observed (using the DSL variant, not the custom Transformer) an 
occurrence of a non-final result that was emited after restart of 
streams processor while the final result for the same key/window had 
been emitted before the restart:


[pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550, 
81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856

...
... restart ...
...
[pool-1-thread-4] APP Consumed: [a@154581526/1545815262000] -> [550] 
INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648



The app logic can not even rely on guarantee that results are ordered 
then. This is really not usable until the bug is fixed.


Regards, Peter



The problem of kafka consumer group

2018-12-26 Thread MCG
Dear Kafka Team :
   I encountered some problems in using Kafka2.0.0, In the same 
consumer group of Kafka cluster, Two consumers consume the same partition, 
Perhaps the picture in the attachment is more illustrative. Everything works 
well with Range Assignor, and this problem may arise later with Sticky 
Assignor. Although the probability is not high, it will also affect the use of 
kafka.I'd like to know if this is normal or bug. I hope you can answer it. 
Thank you.