[jira] [Created] (KAFKA-6523) kafka server not starting
Sanjeevani Mehra created KAFKA-6523: --- Summary: kafka server not starting Key: KAFKA-6523 URL: https://issues.apache.org/jira/browse/KAFKA-6523 Project: Kafka Issue Type: Bug Reporter: Sanjeevani Mehra Attachments: scre.JPG Hi, i ran this command .\bin\windows\kafka-server-start.bat .\config\server.properties, but it does not start the server ( zookeeper is started) . I have also re installed Kafka but still no luck. Can someone suggest a solution ? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6498) Add RocksDB statistics via Streams metrics
[ https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347963#comment-16347963 ] james chien edited comment on KAFKA-6498 at 2/2/18 8:47 AM: [~guozhang] I want to pick it up! For my comprehension is that we want to expose RockDB stats via Streams metrics so that user can use Streams metrics simply to monitor RockDB stats but not to access RockDB directly which means `KStream` should be extended. was (Author: james.c): [~guozhang] I want to pick it up! As my comprehension is that we want to expose RockDB stats via Streams metrics so that user can use Streams metrics simply to monitor RockDB stats but not to access RockDB directly. > Add RocksDB statistics via Streams metrics > -- > > Key: KAFKA-6498 > URL: https://issues.apache.org/jira/browse/KAFKA-6498 > Project: Kafka > Issue Type: Improvement > Components: metrics, streams >Reporter: Guozhang Wang >Assignee: james chien >Priority: Major > Labels: needs-kip > > RocksDB's own stats can be programmatically exposed via > {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many > useful settings already. However these stats are not exposed directly via > Streams today and hence for any users who wants to get access to them they > have to manually interact with the underlying RocksDB directly, not through > Streams. > We should expose such stats via Streams metrics programmatically for users to > investigate them without trying to access the rocksDB directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6498) Add RocksDB statistics via Streams metrics
[ https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347963#comment-16347963 ] james chien edited comment on KAFKA-6498 at 2/2/18 9:01 AM: [~guozhang] I want to pick it up! For my comprehension is that we want to expose RockDB stats via Streams metrics so that user can use Streams metrics simply to monitor RockDB stats but not to access RockDB directly. was (Author: james.c): [~guozhang] I want to pick it up! For my comprehension is that we want to expose RockDB stats via Streams metrics so that user can use Streams metrics simply to monitor RockDB stats but not to access RockDB directly which means `KStream` should be extended. > Add RocksDB statistics via Streams metrics > -- > > Key: KAFKA-6498 > URL: https://issues.apache.org/jira/browse/KAFKA-6498 > Project: Kafka > Issue Type: Improvement > Components: metrics, streams >Reporter: Guozhang Wang >Assignee: james chien >Priority: Major > Labels: needs-kip > > RocksDB's own stats can be programmatically exposed via > {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many > useful settings already. However these stats are not exposed directly via > Streams today and hence for any users who wants to get access to them they > have to manually interact with the underlying RocksDB directly, not through > Streams. > We should expose such stats via Streams metrics programmatically for users to > investigate them without trying to access the rocksDB directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR
[ https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350061#comment-16350061 ] huxihx commented on KAFKA-6522: --- Is it possible that one broker is actually 0.10.2.x? > Retrying leaderEpoch request for partition xxx as the leader reported an > error: UNKNOWN_SERVER_ERROR > > > Key: KAFKA-6522 > URL: https://issues.apache.org/jira/browse/KAFKA-6522 > Project: Kafka > Issue Type: New Feature > Components: core >Affects Versions: 1.0.0 > Environment: Ubuntu 16.04 LTS 64bit-server >Reporter: Wang Shuxiao >Priority: Major > > we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 > fails to fetch data from leader: > {code:java} > [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as > the leader reported an error: UNKNOWN_SERVER_ERROR > (kafka.server.ReplicaFetcherThread) > [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=3] Error when sending leader epoch request for > Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, > sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, > sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, > pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, > pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 > -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, > pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, > __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, > sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 > -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, > sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 > -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, > smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, > __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> > -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 401 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) > at > kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code} > > on the leader(broker-401) side, the log shows: > {code:java} > [2018-02-02 08:58:26,859] ERROR Closing socket for > 192.168.100.101:9099-192.168.100.103:30476 because of error > (kafka.network.Processor) > org.apache.kafka.common.errors.InvalidRequestException: Error getting request > for apiKey: 23 and apiVersion: 0 > Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it > should be between `0` and `20` (inclusive) > at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73) > at > org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) > at > kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96) > at kafka.network.RequestChannel$Request.(RequestChannel.scala:91) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487) > at kafka.network.Processor.run(SocketServer.scala:417) > at java.lang.Thread.run(Thread.java:745){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic
[ https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350186#comment-16350186 ] Manikumar commented on KAFKA-6524: -- exclude.internal.topics only for consumer clients. We can consume from internal topics. But producers can not write to internal topics. > kafka mirror can't producer internal topic > --- > > Key: KAFKA-6524 > URL: https://issues.apache.org/jira/browse/KAFKA-6524 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Ahmed Madkour >Priority: Minor > > We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka > cluster and producer the data to another single broker kafka cluster > We want to include internal topics so we added the following in the consumer > configuration > exclude.internal.topics=false > We keep receiving the following errors: > {code:java} > org.apache.kafka.common.errors.InvalidTopicException: The request attempted > to perform an operation on an invalid topic. > ERROR Error when sending message to topic __consumer_offsets with key: 43 > bytes, value: 28 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > {code} > It seems that the producer can't access the internal topic __consumer_offsets. > Any way to fix that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR
[ https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350123#comment-16350123 ] Wang Shuxiao commented on KAFKA-6522: - all brokers use the same binary file (kafka_2.11-1.0.0.tgz). And it recoveries after restarting the broker-403 > Retrying leaderEpoch request for partition xxx as the leader reported an > error: UNKNOWN_SERVER_ERROR > > > Key: KAFKA-6522 > URL: https://issues.apache.org/jira/browse/KAFKA-6522 > Project: Kafka > Issue Type: New Feature > Components: core >Affects Versions: 1.0.0 > Environment: Ubuntu 16.04 LTS 64bit-server >Reporter: Wang Shuxiao >Priority: Major > > we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 > fails to fetch data from leader: > {code:java} > [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as > the leader reported an error: UNKNOWN_SERVER_ERROR > (kafka.server.ReplicaFetcherThread) > [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=3] Error when sending leader epoch request for > Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, > sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, > sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, > pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, > pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 > -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, > pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, > __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, > sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 > -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, > sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 > -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, > smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, > __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> > -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 401 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) > at > kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code} > > on the leader(broker-401) side, the log shows: > {code:java} > [2018-02-02 08:58:26,859] ERROR Closing socket for > 192.168.100.101:9099-192.168.100.103:30476 because of error > (kafka.network.Processor) > org.apache.kafka.common.errors.InvalidRequestException: Error getting request > for apiKey: 23 and apiVersion: 0 > Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it > should be between `0` and `20` (inclusive) > at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73) > at > org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) > at > kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96) > at kafka.network.RequestChannel$Request.(RequestChannel.scala:91) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487) > at kafka.network.Processor.run(SocketServer.scala:417) > at java.lang.Thread.run(Thread.java:745){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6524) kafka mirror can't producer internal topic
Ahmed Madkour created KAFKA-6524: Summary: kafka mirror can't producer internal topic Key: KAFKA-6524 URL: https://issues.apache.org/jira/browse/KAFKA-6524 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 1.0.0 Reporter: Ahmed Madkour We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka cluster and producer the data to another single broker kafka cluster We want to include internal topics so we added the following in the consumer configuration exclude.internal.topics=false We keep receiving the following errors: {code:java} org.apache.kafka.common.errors.InvalidTopicException: The request attempted to perform an operation on an invalid topic. ERROR Error when sending message to topic __consumer_offsets with key: 43 bytes, value: 28 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) {code} It seems that the producer can't access the internal topic __consumer_offsets. Any way to fix that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6476) Document dynamic config update
[ https://issues.apache.org/jira/browse/KAFKA-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6476: -- Description: Add documentation for dynamic broker config update. Include: - Command line options for kafka-configs.sh with examples - Configs that can be updated along with constraints applied - Secret rotation for password encoder was: Add documentation for dynamic broker config update. Include: - Command line options for kafka-configs.sh with examples - Configs that can be updated along with constraints applied > Document dynamic config update > -- > > Key: KAFKA-6476 > URL: https://issues.apache.org/jira/browse/KAFKA-6476 > Project: Kafka > Issue Type: Sub-task > Components: core, documentation >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Add documentation for dynamic broker config update. > Include: > - Command line options for kafka-configs.sh with examples > - Configs that can be updated along with constraints applied > - Secret rotation for password encoder -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6525) Connect should allow pluggable encryption for records
Randall Hauch created KAFKA-6525: Summary: Connect should allow pluggable encryption for records Key: KAFKA-6525 URL: https://issues.apache.org/jira/browse/KAFKA-6525 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Randall Hauch The Connect framework does not easily support pluggable encryption and decryption mechanisms. It is possible to use custom Converters to encrypt/decrypt individual keys and values when the encryption metadata (keys, algorithm, etc.) can be specified in the Converter. or when the key and/or value are _wrapped_ to include the metadata. However, if the encryption metadata is to be stored as headers, then as of AK 1.1 Connect does have support for using headers in connectors and SMTs, but not Converters. We should make it easier to plug encryption and decryption mechanisms into Connect. Since we're moving to Java 8, one approach might be to change the Converter interface to add a default methods that also supply the headers (and maybe the whole record). An alternative is to define a new plugin interface that can be used to filter/transform/map the entire source and sink records. Here's we'd actually call this for source connectors before the Converter, and for sink connectors after the Converter is called. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-2925) NullPointerException if FileStreamSinkTask is stopped before initialization finishes
[ https://issues.apache.org/jira/browse/KAFKA-2925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Yokota reassigned KAFKA-2925: Assignee: Robert Yokota > NullPointerException if FileStreamSinkTask is stopped before initialization > finishes > > > Key: KAFKA-2925 > URL: https://issues.apache.org/jira/browse/KAFKA-2925 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Robert Yokota >Priority: Minor > > If a FileStreamSinkTask is stopped too quickly after a distributed herder > rebalances work, it can result in cleanup happening without start() ever > being called: > {quote} > Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@f9ac651 was stopped > before completing join group. Task initialization and start is being skipped > (org.apache.kafka.connect.runtime.WorkerSinkTask:150) > {quote} > This is actually a bit weird since stop() is still called so resources > allocated in the constructor can be cleaned up, but possibly unexpected that > stop() will be called without start() ever being called. > Because the code in FileStreamSinkTask's stop() method assumes start() has > been called, it can result in a NullPointerException because it assumes the > PrintStream is already initialized. > The easy fix is to check for nulls before closing. However, we should > probably also consider whether the current possibly sequence of events is > confusing and if we shoud not invoke stop() and make it clear in the SInkTask > interface that you should only initialize stuff in the constructor that won't > need any manual cleanup later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-2925) NullPointerException if FileStreamSinkTask is stopped before initialization finishes
[ https://issues.apache.org/jira/browse/KAFKA-2925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Yokota resolved KAFKA-2925. -- Resolution: Cannot Reproduce I wasn't able to reproduce the NPE and by reviewing the code it doesn't seem possible any longer. Closing this as cannot reproduce. > NullPointerException if FileStreamSinkTask is stopped before initialization > finishes > > > Key: KAFKA-2925 > URL: https://issues.apache.org/jira/browse/KAFKA-2925 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.9.0.0 >Reporter: Ewen Cheslack-Postava >Assignee: Robert Yokota >Priority: Minor > > If a FileStreamSinkTask is stopped too quickly after a distributed herder > rebalances work, it can result in cleanup happening without start() ever > being called: > {quote} > Sink task org.apache.kafka.connect.runtime.WorkerSinkTask@f9ac651 was stopped > before completing join group. Task initialization and start is being skipped > (org.apache.kafka.connect.runtime.WorkerSinkTask:150) > {quote} > This is actually a bit weird since stop() is still called so resources > allocated in the constructor can be cleaned up, but possibly unexpected that > stop() will be called without start() ever being called. > Because the code in FileStreamSinkTask's stop() method assumes start() has > been called, it can result in a NullPointerException because it assumes the > PrintStream is already initialized. > The easy fix is to check for nulls before closing. However, we should > probably also consider whether the current possibly sequence of events is > confusing and if we shoud not invoke stop() and make it clear in the SInkTask > interface that you should only initialize stuff in the constructor that won't > need any manual cleanup later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6496) NAT and Kafka
[ https://issues.apache.org/jira/browse/KAFKA-6496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350715#comment-16350715 ] Ronald van de Kuil commented on KAFKA-6496: --- Hey Manikumar! That was a very good tip! I have got it to work now. Thank you very much! Ronald > NAT and Kafka > - > > Key: KAFKA-6496 > URL: https://issues.apache.org/jira/browse/KAFKA-6496 > Project: Kafka > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: Ronald van de Kuil >Priority: Critical > > Hi, > As far as I know Kafka itself does not support NAT based on a test that I did > with my physical router. > > I can imagine that a real use case exists where NAT is desirable. For > example, an OpenStack installation where Kafka hides behind floating ip > addresses. > > Are there any plans, to make Kafka NAT friendly? > > Best Regards, > Ronald -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6496) NAT and Kafka
[ https://issues.apache.org/jira/browse/KAFKA-6496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ronald van de Kuil updated KAFKA-6496: -- Affects Version/s: (was: 1.0.0) 0.11.0.0 > NAT and Kafka > - > > Key: KAFKA-6496 > URL: https://issues.apache.org/jira/browse/KAFKA-6496 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.11.0.0 >Reporter: Ronald van de Kuil >Priority: Critical > > Hi, > As far as I know Kafka itself does not support NAT based on a test that I did > with my physical router. > > I can imagine that a real use case exists where NAT is desirable. For > example, an OpenStack installation where Kafka hides behind floating ip > addresses. > > Are there any plans, to make Kafka NAT friendly? > > Best Regards, > Ronald -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6496) NAT and Kafka
[ https://issues.apache.org/jira/browse/KAFKA-6496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ronald van de Kuil resolved KAFKA-6496. --- Resolution: Not A Bug > NAT and Kafka > - > > Key: KAFKA-6496 > URL: https://issues.apache.org/jira/browse/KAFKA-6496 > Project: Kafka > Issue Type: Improvement >Affects Versions: 1.0.0 >Reporter: Ronald van de Kuil >Priority: Critical > > Hi, > As far as I know Kafka itself does not support NAT based on a test that I did > with my physical router. > > I can imagine that a real use case exists where NAT is desirable. For > example, an OpenStack installation where Kafka hides behind floating ip > addresses. > > Are there any plans, to make Kafka NAT friendly? > > Best Regards, > Ronald -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable
Rajini Sivaram created KAFKA-6526: - Summary: Update controller to handle changes to unclean.leader.election.enable Key: KAFKA-6526 URL: https://issues.apache.org/jira/browse/KAFKA-6526 Project: Kafka Issue Type: Sub-task Reporter: Rajini Sivaram Assignee: Rajini Sivaram At the moment, updates to default unclean.leader.election.enable uses the same code path as updates to topic overrides. This requires controller change for the new value to take effect. It will be good if we can update the controller to handle the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6511) Connect header parser incorrectly parses arrays
[ https://issues.apache.org/jira/browse/KAFKA-6511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350764#comment-16350764 ] ASF GitHub Bot commented on KAFKA-6511: --- rhauch opened a new pull request #4516: KAFKA-6511: Corrected list parsing logic URL: https://github.com/apache/kafka/pull/4516 Corrected the parsing of invalid list values. A list can only be parsed if it contains elements that have a common type, and a map can only be parsed if it contains keys with a common type and values with a common type. This should only be merged to `trunk`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect header parser incorrectly parses arrays > --- > > Key: KAFKA-6511 > URL: https://issues.apache.org/jira/browse/KAFKA-6511 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Arjun Satish >Assignee: Randall Hauch >Priority: Blocker > Fix For: 1.1.0 > > > An incorrect input like "[1, 2, 3,,,]" is misinterpreted by the Values > parser. An example test can be found here: > https://github.com/apache/kafka/pull/4319#discussion_r165155768 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
Jason Gustafson created KAFKA-6527: -- Summary: Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig Key: KAFKA-6527 URL: https://issues.apache.org/jira/browse/KAFKA-6527 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson {code:java} java.lang.AssertionError: Log segment size increase not applied at kafka.utils.TestUtils$.fail(TestUtils.scala:355) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) at kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6528) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize
Jason Gustafson created KAFKA-6528: -- Summary: Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize Key: KAFKA-6528 URL: https://issues.apache.org/jira/browse/KAFKA-6528 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson {code:java} java.lang.AssertionError: expected:<108> but was:<123> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at kafka.server.DynamicBrokerReconfigurationTest.stopAndVerifyProduceConsume(DynamicBrokerReconfigurationTest.scala:755) at kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:443) at kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:451){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6519) Change log level from ERROR to WARN for not leader for this partition exception
[ https://issues.apache.org/jira/browse/KAFKA-6519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-6519: -- Assignee: Jason Gustafson > Change log level from ERROR to WARN for not leader for this partition > exception > --- > > Key: KAFKA-6519 > URL: https://issues.apache.org/jira/browse/KAFKA-6519 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 1.0.0 >Reporter: Antony Stubbs >Assignee: Jason Gustafson >Priority: Major > > Not the leader for this partition is not an error in operation and is in fact > expected and a apart of the partition discovery / movement system. This > confuses users because they think something is going wrong. I'd suggest at > least changing it to WARN, but perhaps is it even something users should be > warned about? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6354) Update KStream JavaDoc with regard to KIP-182
[ https://issues.apache.org/jira/browse/KAFKA-6354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350836#comment-16350836 ] ASF GitHub Bot commented on KAFKA-6354: --- mjsax closed pull request #4456: KAFKA-6354 Update KStream JavaDoc using new State Store API URL: https://github.com/apache/kafka/pull/4456 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index 5ed569afc93..f05292a0df7 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -209,22 +209,18 @@ Overview// Creating a persistent key-value store: // here, we create a `KeyValueStore` named "persistent-counts". -import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -// Note: The `Stores` factory returns a supplier for the state store, -// because that's what you typically need to pass as API parameter. -StateStoreSupplier countStoreSupplier = - Stores.create("persistent-counts") -.withKeys(Serdes.String()) -.withValues(Serdes.Long()) -.persistent() -.build(); +// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`. +StoreBuilder > countStoreSupplier = + Stores.keyValueStoreBuilder( +Stores.persistentKeyValueStore("persistent-counts"), +Serdes.String(), +Serdes.Long()); +KeyValueStore countStore = countStoreSupplier.build(); -See -PersistentKeyValueFactory for -detailed factory options. In-memory @@ -242,22 +238,18 @@ Overview// Creating an in-memory key-value store: // here, we create a `KeyValueStore ` named "inmemory-counts". -import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -// Note: The `Stores` factory returns a supplier for the state store, -// because that's what you typically need to pass as API parameter. -StateStoreSupplier countStoreSupplier = - Stores.create("inmemory-counts") -.withKeys(Serdes.String()) -.withValues(Serdes.Long()) -.inMemory() -.build(); +// Using a `KeyValueStoreBuilder` to build a `KeyValueStore`. +StoreBuilder > countStoreSupplier = + Stores.keyValueStoreBuilder( +Stores.inMemoryKeyValueStore("inmemory-counts"), +Serdes.String(), +Serdes.Long()); +KeyValueStore countStore = countStoreSupplier.build(); -See -InMemoryKeyValueFactory for -detailed factory options. @@ -332,8 +324,8 @@ Overvieworg.apache.kafka.streams.processor.StateStore. Kafka Streams also has a few extended interfaces such as KeyValueStore. -You also need to provide a “factory” for the store by implementing the -org.apache.kafka.streams.processor.StateStoreSupplier interface, which Kafka Streams uses to create instances of +You also need to provide a “builder” for the store by implementing the +org.apache.kafka.streams.state.StoreBuilder interface, which Kafka Streams uses to create instances of your store. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 634796079ef..29de64c1e67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -131,7 +131,7 @@ * * To query the local {@link KeyValueStore} it must be obtained via * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. - * Use {@link org.apache.kafka.streams.processor.StateStoreSupplier#name()} to get the store name: + * Use {@link org.apache.kafka.streams.processor.StateStore#name()} to get the store name: * {@code * KafkaStreams streams = ... // counting words * String queryableStoreName = storeSupplier.name(); @@ -154,7 +154,7 @@ * Count the number of records in this st
[jira] [Updated] (KAFKA-6354) Update KStream JavaDoc with regard to KIP-182
[ https://issues.apache.org/jira/browse/KAFKA-6354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6354: --- Fix Version/s: 1.0.1 1.1.0 > Update KStream JavaDoc with regard to KIP-182 > - > > Key: KAFKA-6354 > URL: https://issues.apache.org/jira/browse/KAFKA-6354 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Yu LIU >Priority: Minor > Labels: easy-fix, javadocs, newbie > Fix For: 1.1.0, 1.0.1 > > > In {{KStream}} JavaDocs, we show code example building state store via > {{StateStoreSupplier}} -- however, {{StateStoreSupplier}} is deprecated and > we should update the example code accordingly. > This might also affect {{KTable}}, {{KGroupedStream}}, etc and we should > double check all those JavaDocs for outdated examples. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6058) KIP-222: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6058: --- Fix Version/s: 1.2.0 > KIP-222: Add "describe consumer groups" and "list consumer groups" to > KafkaAdminClient > -- > > Key: KAFKA-6058 > URL: https://issues.apache.org/jira/browse/KAFKA-6058 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Major > Labels: kip-222 > Fix For: 1.2.0 > > > {{KafkaAdminClient}} does not allow to get information about consumer groups. > This feature is supported by old {{kafka.admin.AdminClient}} though. > We should add {{KafkaAdminClient#describeConsumerGroups()}} and > {{KafkaAdminClient#listConsumerGroup()}}. > Associated KIP: KIP-222 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5105) ReadOnlyKeyValueStore range scans are not ordered
[ https://issues.apache.org/jira/browse/KAFKA-5105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Minkovsky updated KAFKA-5105: Description: Following up with this thread https://www.mail-archive.com/users@kafka.apache.org/msg25373.html Although ReadOnlyKeyValueStore's #range() is documented not to returns values in order, it would be great if it would for keys within a single partition. This would facilitate using interactive queries and local state as one would use HBase to index data by prefixed keys. If range returned keys in lexicographical order, I could use interactive queries for all my data needs except search. was: _emphasized text_Following up with this thread https://www.mail-archive.com/users@kafka.apache.org/msg25373.html Although ReadOnlyKeyValueStore's #range() is documented not to returns values in order, it would be great if it would for keys within a single partition. This would facilitate using interactive queries and local state as one would use HBase to index data by prefixed keys. If range returned keys in lexicographical order, I could use interactive queries for all my data needs except search. > ReadOnlyKeyValueStore range scans are not ordered > - > > Key: KAFKA-5105 > URL: https://issues.apache.org/jira/browse/KAFKA-5105 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Dmitry Minkovsky >Priority: Major > > Following up with this thread > https://www.mail-archive.com/users@kafka.apache.org/msg25373.html > Although ReadOnlyKeyValueStore's #range() is documented not to returns values > in order, it would be great if it would for keys within a single partition. > This would facilitate using interactive queries and local state as one would > use HBase to index data by prefixed keys. If range returned keys in > lexicographical order, I could use interactive queries for all my data needs > except search. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5105) ReadOnlyKeyValueStore range scans are not ordered
[ https://issues.apache.org/jira/browse/KAFKA-5105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350952#comment-16350952 ] Dmitry Minkovsky commented on KAFKA-5105: - Wondering if anyone knows whether 1.0.0 and up are still affected by this. I still don't entirely understand Damien's explanation for this phenomenon (that it's related to partitions). > ReadOnlyKeyValueStore range scans are not ordered > - > > Key: KAFKA-5105 > URL: https://issues.apache.org/jira/browse/KAFKA-5105 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Dmitry Minkovsky >Priority: Major > > Following up with this thread > https://www.mail-archive.com/users@kafka.apache.org/msg25373.html > Although ReadOnlyKeyValueStore's #range() is documented not to returns values > in order, it would be great if it would for keys within a single partition. > This would facilitate using interactive queries and local state as one would > use HBase to index data by prefixed keys. If range returned keys in > lexicographical order, I could use interactive queries for all my data needs > except search. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5105) ReadOnlyKeyValueStore range scans are not ordered
[ https://issues.apache.org/jira/browse/KAFKA-5105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350970#comment-16350970 ] Dmitry Minkovsky commented on KAFKA-5105: - Specifically, from the mailing list archive, Damien writes: {quote}I think what you are seeing is that the order is not guaranteed across partitions. When you use Queryable State you are actually querying multiple underlying stores, i.e., one per partition. The implementation iterates over one store/partition at a time, so the ordering will appear random. This could be improved {quote} So this is only true when you're using Interactive Queries? Certainly this cannot be true when you've obtained a state store using {{ProcessorContext#getStateStore()}}. > ReadOnlyKeyValueStore range scans are not ordered > - > > Key: KAFKA-5105 > URL: https://issues.apache.org/jira/browse/KAFKA-5105 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Dmitry Minkovsky >Priority: Major > > Following up with this thread > https://www.mail-archive.com/users@kafka.apache.org/msg25373.html > Although ReadOnlyKeyValueStore's #range() is documented not to returns values > in order, it would be great if it would for keys within a single partition. > This would facilitate using interactive queries and local state as one would > use HBase to index data by prefixed keys. If range returned keys in > lexicographical order, I could use interactive queries for all my data needs > except search. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5105) ReadOnlyKeyValueStore range scans are not ordered
[ https://issues.apache.org/jira/browse/KAFKA-5105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350970#comment-16350970 ] Dmitry Minkovsky edited comment on KAFKA-5105 at 2/2/18 9:51 PM: - Specifically, from the mailing list archive, Damien writes: {quote}I think what you are seeing is that the order is not guaranteed across partitions. When you use Queryable State you are actually querying multiple underlying stores, i.e., one per partition. The implementation iterates over one store/partition at a time, so the ordering will appear random. This could be improved {quote} So this is only true when you're using Interactive Queries? Certainly this cannot be true when you've obtained a state store using {{ProcessorContext#getStateStore()}}. *EDIT*: Yeah I followed the code to {{CompositeReadOnlyKeyValueStore}}. Makes sense. Still, if my application only writes a certain key range to a certain store, such that the composite store returned by the interactive query may include stores for multiple partitions but my data is only on one partition, then it should still be ordered? was (Author: dminkovsky): Specifically, from the mailing list archive, Damien writes: {quote}I think what you are seeing is that the order is not guaranteed across partitions. When you use Queryable State you are actually querying multiple underlying stores, i.e., one per partition. The implementation iterates over one store/partition at a time, so the ordering will appear random. This could be improved {quote} So this is only true when you're using Interactive Queries? Certainly this cannot be true when you've obtained a state store using {{ProcessorContext#getStateStore()}}. > ReadOnlyKeyValueStore range scans are not ordered > - > > Key: KAFKA-5105 > URL: https://issues.apache.org/jira/browse/KAFKA-5105 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Dmitry Minkovsky >Priority: Major > > Following up with this thread > https://www.mail-archive.com/users@kafka.apache.org/msg25373.html > Although ReadOnlyKeyValueStore's #range() is documented not to returns values > in order, it would be great if it would for keys within a single partition. > This would facilitate using interactive queries and local state as one would > use HBase to index data by prefixed keys. If range returned keys in > lexicographical order, I could use interactive queries for all my data needs > except search. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5105) ReadOnlyKeyValueStore range scans are not ordered
[ https://issues.apache.org/jira/browse/KAFKA-5105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350970#comment-16350970 ] Dmitry Minkovsky edited comment on KAFKA-5105 at 2/2/18 9:52 PM: - Specifically, from the mailing list archive, Damien writes: {quote}I think what you are seeing is that the order is not guaranteed across partitions. When you use Queryable State you are actually querying multiple underlying stores, i.e., one per partition. The implementation iterates over one store/partition at a time, so the ordering will appear random. This could be improved {quote} So this is only true when you're using Interactive Queries? Certainly this cannot be true when you've obtained a state store using {{ProcessorContext#getStateStore()}}. *EDIT*: Yeah I followed the code to {{CompositeReadOnlyKeyValueStore}}. Makes sense. Still, if my application only writes a certain key range to a certain partition's store, such that the composite store returned by the interactive query may include stores for multiple partitions but my data is only on one partition, then the range should still be ordered? was (Author: dminkovsky): Specifically, from the mailing list archive, Damien writes: {quote}I think what you are seeing is that the order is not guaranteed across partitions. When you use Queryable State you are actually querying multiple underlying stores, i.e., one per partition. The implementation iterates over one store/partition at a time, so the ordering will appear random. This could be improved {quote} So this is only true when you're using Interactive Queries? Certainly this cannot be true when you've obtained a state store using {{ProcessorContext#getStateStore()}}. *EDIT*: Yeah I followed the code to {{CompositeReadOnlyKeyValueStore}}. Makes sense. Still, if my application only writes a certain key range to a certain store, such that the composite store returned by the interactive query may include stores for multiple partitions but my data is only on one partition, then it should still be ordered? > ReadOnlyKeyValueStore range scans are not ordered > - > > Key: KAFKA-5105 > URL: https://issues.apache.org/jira/browse/KAFKA-5105 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Dmitry Minkovsky >Priority: Major > > Following up with this thread > https://www.mail-archive.com/users@kafka.apache.org/msg25373.html > Although ReadOnlyKeyValueStore's #range() is documented not to returns values > in order, it would be great if it would for keys within a single partition. > This would facilitate using interactive queries and local state as one would > use HBase to index data by prefixed keys. If range returned keys in > lexicographical order, I could use interactive queries for all my data needs > except search. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects
Graham Campbell created KAFKA-6529: -- Summary: Broker leaks memory and file descriptors after sudden client disconnects Key: KAFKA-6529 URL: https://issues.apache.org/jira/browse/KAFKA-6529 Project: Kafka Issue Type: Bug Components: network Affects Versions: 0.11.0.2, 1.0.0 Reporter: Graham Campbell If a producer forcefully disconnects from a broker while it has staged receives, that connection enters a limbo state where it is no longer processed by the SocketServer.Processor, leaking the file descriptor for the socket and the memory used for the staged recieve queue for that connection. We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after the rolling restart to upgrade, open file descriptors on the brokers started climbing uncontrollably. In a few cases brokers reached our configured max open files limit of 100k and crashed before we rolled back. We tracked this down to a buildup of muted connections in the Selector.closingChannels list. If a client disconnects from the broker with multiple pending produce requests, when the broker attempts to send an ack to the client it recieves an IOException because the TCP socket has been closed. This triggers the Selector to close the channel, but because it still has pending requests, it adds it to Selector.closingChannels to process those requests. However, because that exception was triggered by trying to send a response, the SocketServer.Processor has marked the channel as muted and will no longer process it at all. *Reproduced by:* Starting a Kafka broker/cluster Client produces several messages and then disconnects abruptly (eg. _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_) Broker then leaks file descriptor previously used for TCP socket and memory for unprocessed messages *Proposed solution (which we've implemented internally)* Whenever an exception is encountered when writing to a socket in Selector.pollSelectionKeys(...) record that that connection failed a send by adding the KafkaChannel ID to Selector.failedSends. Then re-raise the exception to still trigger the socket disconnection logic. Since every exception raised in this function triggers a disconnect, we also treat any exception while writing to the socket as a failed send. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects
[ https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351042#comment-16351042 ] ASF GitHub Bot commented on KAFKA-6529: --- parafiend opened a new pull request #4517: KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives URL: https://github.com/apache/kafka/pull/4517 If an exception is encountered while sending data to a client connection, that connection is disconnected. If there are staged receives for that connection, it is tracked to process those records. However, if the exception was encountered during processing a `RequestChannel.Request`, the `KafkaChannel` for that connection is muted and won't be processed. Add the channel to failed sends so the connection is cleaned up on those exceptions. This stops the leak of the memory for pending requests and the file descriptor of the TCP socket. Only flag channel as failed send when an exception is encountered while actually attempting to send something. Other socket interactions don't count. Test that a channel is closed when an exception is raised while writing to a socket that has been closed by the client. Since sending a response requires acks != 0, allow specifying the required acks for test requests in SocketServerTest.scala. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broker leaks memory and file descriptors after sudden client disconnects > > > Key: KAFKA-6529 > URL: https://issues.apache.org/jira/browse/KAFKA-6529 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Graham Campbell >Priority: Major > > If a producer forcefully disconnects from a broker while it has staged > receives, that connection enters a limbo state where it is no longer > processed by the SocketServer.Processor, leaking the file descriptor for the > socket and the memory used for the staged recieve queue for that connection. > We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after > the rolling restart to upgrade, open file descriptors on the brokers started > climbing uncontrollably. In a few cases brokers reached our configured max > open files limit of 100k and crashed before we rolled back. > We tracked this down to a buildup of muted connections in the > Selector.closingChannels list. If a client disconnects from the broker with > multiple pending produce requests, when the broker attempts to send an ack to > the client it recieves an IOException because the TCP socket has been closed. > This triggers the Selector to close the channel, but because it still has > pending requests, it adds it to Selector.closingChannels to process those > requests. However, because that exception was triggered by trying to send a > response, the SocketServer.Processor has marked the channel as muted and will > no longer process it at all. > *Reproduced by:* > Starting a Kafka broker/cluster > Client produces several messages and then disconnects abruptly (eg. > _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_) > Broker then leaks file descriptor previously used for TCP socket and memory > for unprocessed messages > *Proposed solution (which we've implemented internally)* > Whenever an exception is encountered when writing to a socket in > Selector.pollSelectionKeys(...) record that that connection failed a send by > adding the KafkaChannel ID to Selector.failedSends. Then re-raise the > exception to still trigger the socket disconnection logic. Since every > exception raised in this function triggers a disconnect, we also treat any > exception while writing to the socket as a failed send. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects
[ https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Graham Campbell updated KAFKA-6529: --- Flags: Patch > Broker leaks memory and file descriptors after sudden client disconnects > > > Key: KAFKA-6529 > URL: https://issues.apache.org/jira/browse/KAFKA-6529 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Graham Campbell >Priority: Major > > If a producer forcefully disconnects from a broker while it has staged > receives, that connection enters a limbo state where it is no longer > processed by the SocketServer.Processor, leaking the file descriptor for the > socket and the memory used for the staged recieve queue for that connection. > We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after > the rolling restart to upgrade, open file descriptors on the brokers started > climbing uncontrollably. In a few cases brokers reached our configured max > open files limit of 100k and crashed before we rolled back. > We tracked this down to a buildup of muted connections in the > Selector.closingChannels list. If a client disconnects from the broker with > multiple pending produce requests, when the broker attempts to send an ack to > the client it recieves an IOException because the TCP socket has been closed. > This triggers the Selector to close the channel, but because it still has > pending requests, it adds it to Selector.closingChannels to process those > requests. However, because that exception was triggered by trying to send a > response, the SocketServer.Processor has marked the channel as muted and will > no longer process it at all. > *Reproduced by:* > Starting a Kafka broker/cluster > Client produces several messages and then disconnects abruptly (eg. > _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_) > Broker then leaks file descriptor previously used for TCP socket and memory > for unprocessed messages > *Proposed solution (which we've implemented internally)* > Whenever an exception is encountered when writing to a socket in > Selector.pollSelectionKeys(...) record that that connection failed a send by > adding the KafkaChannel ID to Selector.failedSends. Then re-raise the > exception to still trigger the socket disconnection logic. Since every > exception raised in this function triggers a disconnect, we also treat any > exception while writing to the socket as a failed send. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351051#comment-16351051 ] Guozhang Wang commented on KAFKA-6502: -- Hello Soby, thanks for reporting this issue. Your reasoning is basically right: a record is firstly deserialized and then put into the buffer, then being processed. If the deser failed it will not be processed at all. And then if there is a series of records having deserialization error, none would be processed, i.e. {{StreamTask.process()}} would not be processed at all. Before we jumped onto possible fixes, could I ask your scenarios: when you see the skipped records metric increasing, why do you want to restart your applications immediately in the middle of such series of error messages? Would you expect there is no valid records ever coming next? My reasoning is that: 1. If these poison pills are due to your application's own serde was buggy, then after restarting with the fixed serde these records should be correctly processed then, so we are good here. 2. If these poison pills are bad themselves and cannot be processed anyways, you would not bother restarting your application; on the other hand you can just let your application continue to run and skip these records. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6498) Add RocksDB statistics via Streams metrics
[ https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351063#comment-16351063 ] Guozhang Wang commented on KAFKA-6498: -- [~james.c] Thanks for your interest and sorry for late response. Let me summarize the scope of this ticket before you commit to contributing on it. RocksDB JNI's Statistics object contains multiple metrics, and moving forward it may add more. So to maintain compatibility, we need to do two things: 1) Select what metrics (a subset of all metrics from Statistics) to be exposed directly via Streams metrics as built-in streams metrics; note we cannot just enumerate over all metrics and add them all into built-in metrics, since there will just be more and more of them 2) More importantly, design a set of interfaces to allow users to manually add more rocksDB metrics in a custom way (in StreamMetrics we already have these addXXXSensor to let users to build their own metrics, the question then is how to allow users to hook up more rocksDB metrics) So this ticket involves design and implementation, not merely straight forward implementations. And the contributor is supposed to write a KIP about his design on this. If you are interested, could you start by doing the research on the above mentioned interfaces and write a KIP proposing your design? > Add RocksDB statistics via Streams metrics > -- > > Key: KAFKA-6498 > URL: https://issues.apache.org/jira/browse/KAFKA-6498 > Project: Kafka > Issue Type: Improvement > Components: metrics, streams >Reporter: Guozhang Wang >Assignee: james chien >Priority: Major > Labels: needs-kip > > RocksDB's own stats can be programmatically exposed via > {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many > useful settings already. However these stats are not exposed directly via > Streams today and hence for any users who wants to get access to them they > have to manually interact with the underlying RocksDB directly, not through > Streams. > We should expose such stats via Streams metrics programmatically for users to > investigate them without trying to access the rocksDB directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6494) Extend ConfigCommand to update broker config using new AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6494. Resolution: Fixed Fix Version/s: (was: 1.2.0) 1.1.0 > Extend ConfigCommand to update broker config using new AdminClient > -- > > Key: KAFKA-6494 > URL: https://issues.apache.org/jira/browse/KAFKA-6494 > Project: Kafka > Issue Type: Sub-task >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > Add --bootstrap-server and --command-config options for new AdminClient. > Update ConfigCommand to use new AdminClient for dynamic broker config updates > in KIP-226. Full conversion of ConfigCommand to new AdminClient will be done > later under KIP-248. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6454) Allow timestamp manipulation in Processor API
[ https://issues.apache.org/jira/browse/KAFKA-6454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351121#comment-16351121 ] ASF GitHub Bot commented on KAFKA-6454: --- mjsax opened a new pull request #4519: KAFKA-6454: Allow timestamp manipulation in Processor API URL: https://github.com/apache/kafka/pull/4519 implements KIP-251 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow timestamp manipulation in Processor API > - > > Key: KAFKA-6454 > URL: https://issues.apache.org/jira/browse/KAFKA-6454 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > For the DSL and also for custom operator, it would be desirable to allow > timestamp manipulation for at Processor level for individual records that are > forwarded. > KIP-251: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6476) Document dynamic config update
[ https://issues.apache.org/jira/browse/KAFKA-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6476: -- Description: Add documentation for dynamic broker config update. Include: - Command line options for kafka-configs.sh with examples - Configs that can be updated along with constraints applied - Secret rotation for password encoder Also add a new column for broker configs to indicate which configs can be dynamically updated, was: Add documentation for dynamic broker config update. Include: - Command line options for kafka-configs.sh with examples - Configs that can be updated along with constraints applied - Secret rotation for password encoder > Document dynamic config update > -- > > Key: KAFKA-6476 > URL: https://issues.apache.org/jira/browse/KAFKA-6476 > Project: Kafka > Issue Type: Sub-task > Components: core, documentation >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Add documentation for dynamic broker config update. > Include: > - Command line options for kafka-configs.sh with examples > - Configs that can be updated along with constraints applied > - Secret rotation for password encoder > Also add a new column for broker configs to indicate which configs can be > dynamically updated, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5946) Give connector method parameter better name
[ https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5946: -- Labels: connector newbie (was: connector newbie usability) > Give connector method parameter better name > --- > > Key: KAFKA-5946 > URL: https://issues.apache.org/jira/browse/KAFKA-5946 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Priority: Major > Labels: connector, newbie > > During the development of KAFKA-5657, there were several iterations where > method call didn't match what the connector parameter actually represents. > [~ewencp] had used connType as equivalent to connClass because Type wasn't > used to differentiate source vs sink. > [~ewencp] proposed the following: > {code} > It would help to convert all the uses of connType to connClass first, then > standardize on class == java class, type == source/sink, name == > user-specified name. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6498) Add RocksDB statistics via Streams metrics
[ https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351245#comment-16351245 ] james chien commented on KAFKA-6498: [~guozhang] Thank for your summarization, I will start to do the research and come up with a better way to accomplish and write a KIP proposing also. > Add RocksDB statistics via Streams metrics > -- > > Key: KAFKA-6498 > URL: https://issues.apache.org/jira/browse/KAFKA-6498 > Project: Kafka > Issue Type: Improvement > Components: metrics, streams >Reporter: Guozhang Wang >Assignee: james chien >Priority: Major > Labels: needs-kip > > RocksDB's own stats can be programmatically exposed via > {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many > useful settings already. However these stats are not exposed directly via > Streams today and hence for any users who wants to get access to them they > have to manually interact with the underlying RocksDB directly, not through > Streams. > We should expose such stats via Streams metrics programmatically for users to > investigate them without trying to access the rocksDB directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6530) Use actual first offset of messages when rolling log segment for magic v2
Jason Gustafson created KAFKA-6530: -- Summary: Use actual first offset of messages when rolling log segment for magic v2 Key: KAFKA-6530 URL: https://issues.apache.org/jira/browse/KAFKA-6530 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson We've implemented a heuristic to avoid overflowing when rolling a log segment to determine the base offset of the next segment without decompressing the message set to find the actual first offset. With the v2 message format, we can find the first offset without needing decompression, so we can set the correct base offset exactly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)