[GitHub] [kafka] chia7712 commented on a change in pull request #9663: MINOR: Small cleanups in `AlterIsr` handling logic
chia7712 commented on a change in pull request #9663: URL: https://github.com/apache/kafka/pull/9663#discussion_r533126745 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1372,23 +1390,27 @@ class Partition(val topicPartition: TopicPartition, * Since our error was non-retryable we are okay staying in this state until we see new metadata from UpdateMetadata * or LeaderAndIsr */ - private def handleAlterIsrResponse(proposedIsr: Set[Int], result: Either[Errors, LeaderAndIsr]): Unit = { + private def handleAlterIsrResponse(proposedIsrState: IsrState, result: Either[Errors, LeaderAndIsr]): Unit = { Review comment: Could we rewrite it by currying? *Before* ```scala val callbackPartial = handleAlterIsrResponse(proposedIsrState, _ : Either[Errors, LeaderAndIsr]) if (!alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, callbackPartial))) { throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request with state " + s"$newLeaderAndIsr for partition $topicPartition") } ``` *After* ```scala if (!alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState { throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request with state " + s"$newLeaderAndIsr for partition $topicPartition") } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #8404: KAFKA-10787: Introduce an import order in Java sources
dongjinleekr commented on pull request #8404: URL: https://github.com/apache/kafka/pull/8404#issuecomment-736272990 Here is the update. - Adapted the three-group import ordering as discussed in [the mailing thread](https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E). - Add documentation for the IDE plugins. - Reorder the existing files and rebased onto the latest trunk. cc/ @mjsax @cadonna @ableegoldman @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap
viktorsomogyi edited a comment on pull request #9519: URL: https://github.com/apache/kafka/pull/9519#issuecomment-736027069 @lbradstreet it is really hard to give an exact answer to this as collision rate is hard to calculate mathematically as it is very dependant on the size and values of the testset. For non-cryptographic hashes it is possible to generate DDoS attacks where everything gets placed into the same bucket and thus slows down lookups. On the theoretical side though Murmur3 passes the most often cited Chi Square test, it has a very good avalanche effect and thus generates a hashes that are very close to the uniform distribution. Because of the lack of available mathematical articles on this topic (murmur vs MD5) I started brute-force tests where I generated a few billion unique keys and inserted them into a Bloom Filter (which had a 1% false positive probability). That showed that Murmur3 is actually on the same level as MD5, it generates roughly the same amount of collisions. I have two types of datasets: the first can use any UTF8 characters and the second works only from the printable ASCII characters. In fact both MD5, murmur3 128bit, murmur3 64bit and xxhash64 bit generated around the same amount of collisions which was 0.016% out of 200 million unique keys. I added Murmur3 32bit for a baseline but it was significantly worse, around 2% of collisions. Maybe to show the difference we need a much larger keyset, I'll try to do what I can. I'll publish my code in the following days I just have to work on something else too so it's a bit slow, sorry :). On the other hand if we want to make sure that there will be no collisions, I don't think it's possible with either of these solutions, there is always a chance. To completely cut this off we either have to store the user key-hash maps similarly to the offset indexes and reject new, colliding keys or use perfect hashes (but that couldn't work well as it requires the knowledge of the full keyset or have to rebuild the cache in each insert or at least often). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10629) TopologyTestDriver should not require a Properties arg
[ https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241301#comment-17241301 ] Rohit Deshpande commented on KAFKA-10629: - thanks [~mjsax] > TopologyTestDriver should not require a Properties arg > -- > > Key: KAFKA-10629 > URL: https://issues.apache.org/jira/browse/KAFKA-10629 > Project: Kafka > Issue Type: Improvement > Components: streams, streams-test-utils >Reporter: John Roesler >Assignee: Rohit Deshpande >Priority: Minor > Labels: kip, newbie > > As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver > usages will have no configurations at all to specify, so we should provide a > constructor that doesn't take a Properties argument. Right now, such > configuration-free usages have to provide an empty Properties object. > KIP-680: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming commented on pull request #9664: KAFKA-10780; Rewrite ControllerZNode struct with auto-generated protocol
dengziming commented on pull request #9664: URL: https://github.com/apache/kafka/pull/9664#issuecomment-736255004 @ijuma Thank you, I ignored this fact. Some of the ZkNode will be removed because we no longer need them after KIP-500, for example `IsrChangeNotificationZNode` and `ControllerZNode`, however Some ZkNode will just be moved from ZkNode to RaftKafkaMetadataLog , for example `TopicZNode` and `FeatureZNode`. So can we still rewrite their struct use auto-generated protocol which can also be used after KIP-500? also ping @abbccdda as this is a major direction discussion. I will close this pr and create another for `TopicZNode` if you think it's reasonable, Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #9660: Kafka 10629 - TopologyTestDriver should not require a Properties argument
rohitrmd commented on a change in pull request #9660: URL: https://github.com/apache/kafka/pull/9660#discussion_r533099003 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -254,6 +265,19 @@ public TopologyTestDriver(final Topology topology, this(topology, config, null); } +/** + * Create a new test diver instance. + * Initialized the internally mocked wall-clock time with {@link System#currentTimeMillis() current system time}. Review comment: fixed the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10787) Introduce an import order in Java sources
[ https://issues.apache.org/jira/browse/KAFKA-10787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjin Lee updated KAFKA-10787: Affects Version/s: 2.8.0 > Introduce an import order in Java sources > - > > Key: KAFKA-10787 > URL: https://issues.apache.org/jira/browse/KAFKA-10787 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > > As of present, Kafka uses a relatively strict code style for Java code, > except import order. For this reason, the code formatting settings of every > local dev environment are different from person to person, resulting in > countless meaningless import order changes in the PR. > This issue aims to define and apply a 3-group import order, like the > following: > 1. Project packages: {{kafka.*}}, {{org.apache.kafka.*}} > 2. Third Party packages: {{com.*}}, {{net.*}}, {{org.*}} > 3. Java packages: {{java.*}}, {{javax.*}} > Discussion Thread: > https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10787) Introduce an import order in Java sources
[ https://issues.apache.org/jira/browse/KAFKA-10787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241272#comment-17241272 ] Dongjin Lee commented on KAFKA-10787: - PR: https://github.com/apache/kafka/pull/8404 > Introduce an import order in Java sources > - > > Key: KAFKA-10787 > URL: https://issues.apache.org/jira/browse/KAFKA-10787 > Project: Kafka > Issue Type: Improvement >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > > As of present, Kafka uses a relatively strict code style for Java code, > except import order. For this reason, the code formatting settings of every > local dev environment are different from person to person, resulting in > countless meaningless import order changes in the PR. > This issue aims to define and apply a 3-group import order, like the > following: > 1. Project packages: {{kafka.*}}, {{org.apache.kafka.*}} > 2. Third Party packages: {{com.*}}, {{net.*}}, {{org.*}} > 3. Java packages: {{java.*}}, {{javax.*}} > Discussion Thread: > https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r533093802 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -1271,27 +1270,48 @@ public void testProducerJmxPrefix() throws Exception { producer.close(); } -private ProducerMetadata newMetadata(long refreshBackoffMs, long expirationMs) { -return new ProducerMetadata(refreshBackoffMs, expirationMs, defaultMetadataIdleMs, +private static ProducerMetadata newMetadata(long refreshBackoffMs, long expirationMs) { +return new ProducerMetadata(refreshBackoffMs, expirationMs, DEFAULT_METADATA_IDLE_MS, new LogContext(), new ClusterResourceListeners(), Time.SYSTEM); } @Test -public void serializerShouldSeeGeneratedClientId() { +public void configurableObjectsShouldSeeGeneratedClientId() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SerializerForClientId.class.getName()); +props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerForClientId.class.getName()); +props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorForClientId.class.getName()); KafkaProducer producer = new KafkaProducer<>(props); -assertEquals(2, SerializerForClientId.CLIENT_IDS.size()); -assertEquals(SerializerForClientId.CLIENT_IDS.get(0), producer.getClientId()); -assertEquals(SerializerForClientId.CLIENT_IDS.get(1), producer.getClientId()); +assertNotNull(producer.getClientId()); +assertNotEquals(0, producer.getClientId().length()); +assertEquals(4, CLIENT_IDS.size()); +CLIENT_IDS.forEach(id -> assertEquals(id, producer.getClientId())); producer.close(); } +@Test +public void testUnusedConfigs() { +Map props = new HashMap<>(); +props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS"); +ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(props, +new StringSerializer(), new StringSerializer())); + +assertTrue(new ProducerConfig(config.originals(), false).unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG)); Review comment: I'm going to remove this test ```assertTrue(new ProducerConfig(config.originals(), false).unused().co...``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r533093616 ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map originals, Map throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string."); this.originals = resolveConfigVariables(configProviderProps, (Map) originals); -this.values = definition.parse(this.originals); +// pass a copy to definition.parse. Otherwise, the definition.parse adds all keys of definitions to "used" group +// since definition.parse needs to call "RecordingMap#get" when checking all definitions. +this.values = definition.parse(new HashMap<>(this.originals)); Review comment: > But it seems this is still needed? It is not necessary with the latest change. I kept it as a total solution (if someone pass ```RecordingMap``` in the future). However, I'm going to remove it to make this PR simpler. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10787) Introduce an import order in Java sources
Dongjin Lee created KAFKA-10787: --- Summary: Introduce an import order in Java sources Key: KAFKA-10787 URL: https://issues.apache.org/jira/browse/KAFKA-10787 Project: Kafka Issue Type: Improvement Reporter: Dongjin Lee Assignee: Dongjin Lee As of present, Kafka uses a relatively strict code style for Java code, except import order. For this reason, the code formatting settings of every local dev environment are different from person to person, resulting in countless meaningless import order changes in the PR. This issue aims to define and apply a 3-group import order, like the following: 1. Project packages: {{kafka.*}}, {{org.apache.kafka.*}} 2. Third Party packages: {{com.*}}, {{net.*}}, {{org.*}} 3. Java packages: {{java.*}}, {{javax.*}} Discussion Thread: https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r533092986 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -314,27 +315,23 @@ public KafkaProducer(Properties properties) { * be called in the producer when the serializer is passed in directly. */ public KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer) { -this(Utils.propsToMap(properties), keySerializer, valueSerializer, null, null, null, -Time.SYSTEM); +this(Utils.propsToMap(properties), keySerializer, valueSerializer); } // visible for testing @SuppressWarnings("unchecked") -KafkaProducer(Map configs, +KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) { -ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, -valueSerializer)); try { -Map userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = time; -String transactionalId = (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG); +String transactionalId = (String) config.originals().get(ProducerConfig.TRANSACTIONAL_ID_CONFIG); Review comment: good point. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r533092957 ## File path: clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java ## @@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() { // test configs with listener prefix Map configs = ChannelBuilders.channelBuilderConfigs(securityConfig, new ListenerName("listener1")); + assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), "testkafka"); + assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name")); + assertEquals(configs.get("sasl.kerberos.service.name"), "testkafkaglobal"); + assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name")); Review comment: you are right. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests
abbccdda commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r533091766 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ## @@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC } override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], - callback: RequestCompletionHandler): Unit = { -requestQueue.put(BrokerToControllerQueueItem(request, callback)) + callback: BrokerToControllerRequestCompletionHandler, + requestTimeout: Long): Unit = { +requestQueue.put(BrokerToControllerQueueItem(request, callback, time.milliseconds() + requestTimeout)) Review comment: Good catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests
abbccdda commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r533091229 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { val message = buildRequest(inflightAlterIsrItems) -def responseHandler(response: ClientResponse): Unit = { - try { -val body = response.responseBody().asInstanceOf[AlterIsrResponse] -handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) - } finally { -// Be sure to clear the in-flight flag to allow future AlterIsr requests -if (!inflightRequest.compareAndSet(true, false)) { - throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + +def clearInflightRequests(): Unit = { + // Be sure to clear the in-flight flag to allow future AlterIsr requests + if (!inflightRequest.compareAndSet(true, false)) { +throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + } +} + +class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler { + override def onComplete(response: ClientResponse): Unit = { +try { + val body = response.responseBody().asInstanceOf[AlterIsrResponse] + handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) +} finally { + clearInflightRequests() } } + + override def onTimeout(): Unit = { +warn(s"Encountered request when sending AlterIsr to the controller") Review comment: Not sure we could do the callback here, since the request failed already. Maybe just do nothing here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9664: KAFKA-10780; Rewrite ControllerZNode struct with auto-generated protocol
ijuma commented on pull request #9664: URL: https://github.com/apache/kafka/pull/9664#issuecomment-736240488 Thanks for the PR. I don't think we should do this since KIP-500 will replace all this code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r533079695 ## File path: clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java ## @@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() { // test configs with listener prefix Map configs = ChannelBuilders.channelBuilderConfigs(securityConfig, new ListenerName("listener1")); + assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), "testkafka"); + assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name")); + assertEquals(configs.get("sasl.kerberos.service.name"), "testkafkaglobal"); + assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name")); + assertNull(configs.get("listener.name.listener1.sasl.kerberos.service.name")); + assertFalse(securityConfig.unused().contains("listener.name.listener1.sasl.kerberos.service.name")); assertNull(configs.get("plain.sasl.server.callback.handler.class")); + assertFalse(securityConfig.unused().contains("plain.sasl.server.callback.handler.class")); + assertEquals(configs.get("listener.name.listener1.gssapi.config1.key"), "custom.config1"); + assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.config1.key")); + assertEquals(configs.get("custom.config2.key"), "custom.config2"); +assertFalse(securityConfig.unused().contains("custom.config2.key")); // test configs without listener prefix +securityConfig = new TestSecurityConfig(props); Review comment: We need a new ```RecordingMap``` to test different key without listener prefix. Otherwise, the key may be used by previous test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9423: KAFKA-9263 The new hw is added to incorrect log when ReplicaAlterLogD…
chia7712 commented on pull request #9423: URL: https://github.com/apache/kafka/pull/9423#issuecomment-736222765 > Are test failures related to this PR? They are unrelated error. Will rebase PR to trigger QA again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9659: KAFKA-10770: Remove duplicate defination of Metrics#getTags
chia7712 merged pull request #9659: URL: https://github.com/apache/kafka/pull/9659 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9664: KAFKA-10780; Rewrite ControllerZNode struct with auto-generated protocol
dengziming commented on pull request #9664: URL: https://github.com/apache/kafka/pull/9664#issuecomment-736217128 Hi, @abbccdda PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10786) ReplicaAlterLogDirsThread gets stuck during the reassignment of Kafka partition
[ https://issues.apache.org/jira/browse/KAFKA-10786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nick song updated KAFKA-10786: -- Priority: Blocker (was: Major) > ReplicaAlterLogDirsThread gets stuck during the reassignment of Kafka > partition > > > Key: KAFKA-10786 > URL: https://issues.apache.org/jira/browse/KAFKA-10786 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 2.0.0 >Reporter: nick song >Priority: Blocker > Attachments: attachment 1.png, attachment 2.png, attachment 3.png > > > Topic config:Configs for topic 'athena_8603' are > leader.replication.throttled.replicas=9:7,9:6,10:8,10:7,8:6,8:5,11:9,11:8,follower.replication.throttled.replicas=9:13,10:0,8:15,11:14,retention.ms=8640,delete.retention.ms=6 > > Reassignment of replica athena_8603-1-15 is still in progress > > When I reassigning the topic partition, I found that some tasks have been in > progress, lasting more than ten hours. After investigation, it was found that > ReplicaAlterLogDirsThread was running all the time and occupies a high CPU > usage rate (Attachment 1). > Check the thread information (Attachment 2) and find that the log data is > being copied. Check the log directory (Attachment 3) and find that the index > of the future directory is older than the original log. Is it because the > configuration delete.retention.ms=6 caused the data to be deleted while > copying ? This causes the replication thread to get stuck. Is there any > solution? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10786) ReplicaAlterLogDirsThread gets stuck during the reassignment of Kafka partition
nick song created KAFKA-10786: - Summary: ReplicaAlterLogDirsThread gets stuck during the reassignment of Kafka partition Key: KAFKA-10786 URL: https://issues.apache.org/jira/browse/KAFKA-10786 Project: Kafka Issue Type: Bug Components: log Affects Versions: 2.0.0 Reporter: nick song Attachments: attachment 1.png, attachment 2.png, attachment 3.png Topic config:Configs for topic 'athena_8603' are leader.replication.throttled.replicas=9:7,9:6,10:8,10:7,8:6,8:5,11:9,11:8,follower.replication.throttled.replicas=9:13,10:0,8:15,11:14,retention.ms=8640,delete.retention.ms=6 Reassignment of replica athena_8603-1-15 is still in progress When I reassigning the topic partition, I found that some tasks have been in progress, lasting more than ten hours. After investigation, it was found that ReplicaAlterLogDirsThread was running all the time and occupies a high CPU usage rate (Attachment 1). Check the thread information (Attachment 2) and find that the log data is being copied. Check the log directory (Attachment 3) and find that the index of the future directory is older than the original log. Is it because the configuration delete.retention.ms=6 caused the data to be deleted while copying ? This causes the replication thread to get stuck. Is there any solution? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.
hachikuji commented on a change in pull request #9601: URL: https://github.com/apache/kafka/pull/9601#discussion_r533034552 ## File path: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala ## @@ -84,19 +85,24 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { } // Custom header serialization so that protocol assumptions are not forced - private def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = { + def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = { +// Check for flex versions, some tests here verify that an invalid apiKey is detected properly, so if -1 is used, +// assume the request is not using flex versions. +val flexVersion = if (apiKey >= 0) ApiKeys.forId(apiKey).requestHeaderVersion(apiVersion) >= 2 else false val size = { 2 /* apiKey */ + 2 /* version id */ + 4 /* correlation id */ + -Type.NULLABLE_STRING.sizeOf(clientId) /* client id */ +Type.NULLABLE_STRING.sizeOf(clientId) /* client id */ + +(if (flexVersion) ByteUtils.sizeOfUnsignedVarint(0) else 0) Review comment: nit: maybe add a comment that this field is for the number of tagged fields? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #9664: KAFKA-10780; Rewrite ControllerZNode struct with auto-generated protocol
dengziming opened a new pull request #9664: URL: https://github.com/apache/kafka/pull/9664 *More detailed description of your change* The #9662 rewrite FeatureZNode struct with auto-generated protocol, but it's a non-trivial change, so we can just review this simple pr first. *Summary of testing strategy (including rationale)* unit test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.
hachikuji commented on a change in pull request #9601: URL: https://github.com/apache/kafka/pull/9601#discussion_r533033927 ## File path: clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java ## @@ -105,8 +105,8 @@ public int hashCode() { public final WriteTxnMarkersRequestData data; -public Builder(final List markers) { -super(ApiKeys.WRITE_TXN_MARKERS); +public Builder(final List markers, short latestAllowedVersion) { +super(ApiKeys.WRITE_TXN_MARKERS, ApiKeys.WRITE_TXN_MARKERS.oldestVersion(), latestAllowedVersion); Review comment: I think this is probably ok, but it is a little inconsistent with how we handle the versions for other inter-broker RPCs. Since we rely on the IBP, we always set the version explicitly in the caller, which means there is exactly one allowable version for the builder to use. See for example `LeaderAndIsrRequest.Builder`. ## File path: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala ## @@ -84,19 +85,24 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { } // Custom header serialization so that protocol assumptions are not forced - private def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = { + def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = { +// Check for flex versions, some tests here verify that an invalid apiKey is detected properly, so if -1 is used, +// assume the request is not using flex versions. +val flexVersion = if (apiKey >= 0) ApiKeys.forId(apiKey).requestHeaderVersion(apiVersion) >= 2 else false val size = { 2 /* apiKey */ + 2 /* version id */ + 4 /* correlation id */ + -Type.NULLABLE_STRING.sizeOf(clientId) /* client id */ +Type.NULLABLE_STRING.sizeOf(clientId) /* client id */ + +(if (flexVersion) ByteUtils.sizeOfUnsignedVarint(0) else 0) Review comment: nit: maybe add a comment that this is field is for the number of tagged fields? ## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ## @@ -286,6 +286,7 @@ public static byte requiredMagicForVersion(short produceRequestVersion) { case 6: case 7: case 8: +case 9: Review comment: I wonder if we may as well make this the default case. Not sure we're getting much by forcing ourselves to update this logic after each bump. Maybe the range validation is still useful, but that could be done by using `oldestVersion` and `latestVersion`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9615: KAFKA-10500: Add thread option
mjsax commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533026232 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -398,6 +407,7 @@ public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler st final Consumer handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); synchronized (stateLock) { if (state == State.CREATED) { +this.streamsUncaughtExceptionHandler = handler; Review comment: nit. remove unnecessary `this.` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, time, globalThreadId, delegatingStateRestoreListener, -this::defaultStreamsUncaughtExceptionHandler +streamsUncaughtExceptionHandler ); globalThreadState = globalStreamThread.state(); } // use client id instead of thread client id since this admin client may be shared among threads adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); -final Map threadState = new HashMap<>(numStreamThreads); -final ArrayList storeProviders = new ArrayList<>(); +threadState = new HashMap<>(numStreamThreads); +storeProviders = new ArrayList<>(); for (int i = 0; i < numStreamThreads; i++) { -final StreamThread streamThread = StreamThread.create( -internalTopologyBuilder, -config, -clientSupplier, -adminClient, -processId, -clientId, -streamsMetrics, -time, -streamsMetadataState, -cacheSizePerThread, -stateDirectory, -delegatingStateRestoreListener, -i + 1, -KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +createStreamThread(cacheSizePerThread, i + 1); Review comment: Nit: can we change the loop to `int = 1; i <= numStreamThreads` and just pass in `i` here? ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, time, globalThreadId, delegatingStateRestoreListener, -this::defaultStreamsUncaughtExceptionHandler +streamsUncaughtExceptionHandler ); globalThreadState = globalStreamThread.state(); } // use client id instead of thread client id since this admin client may be shared among threads adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); -final Map threadState = new HashMap<>(numStreamThreads); -final ArrayList storeProviders = new ArrayList<>(); +threadState = new HashMap<>(numStreamThreads); +storeProviders = new ArrayList<>(); for (int i = 0; i < numStreamThreads; i++) { -final StreamThread streamThread = StreamThread.create( -internalTopologyBuilder, -config, -clientSupplier, -adminClient, -processId, -clientId, -streamsMetrics, -time, -streamsMetadataState, -cacheSizePerThread, -stateDirectory, -delegatingStateRestoreListener, -i + 1, -KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +createStreamThread(cacheSizePerThread, i + 1); } ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); -final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState); +streamStateListener = new StreamStateListener(threadState, globalThreadState); Review comment: Can we
[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests
hachikuji commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r533024276 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ## @@ -35,7 +35,8 @@ import scala.jdk.CollectionConverters._ trait BrokerToControllerChannelManager { def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], - callback: RequestCompletionHandler): Unit + callback: BrokerToControllerRequestCompletionHandler, + requestTimeout: Long): Unit Review comment: Perhaps we could use a name like `retryTimeout` to distinguish this from the request timeout which only applies to individual requests. Alternatively we could let the caller provide the retry deadline explicitly. This would save the need for the extra `time.milliseconds` call. ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ## @@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient, } } + private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = { Review comment: nit: `hasTimedOut`? ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ## @@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC } override def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest], - callback: RequestCompletionHandler): Unit = { -requestQueue.put(BrokerToControllerQueueItem(request, callback)) + callback: BrokerToControllerRequestCompletionHandler, + requestTimeout: Long): Unit = { +requestQueue.put(BrokerToControllerQueueItem(request, callback, time.milliseconds() + requestTimeout)) Review comment: Won't this overflow with `requestTimeout` set to `Long.MaxValue`. Do we have any test cases? ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ## @@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient, } } + private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = { +time.milliseconds() > request.deadlineMs Review comment: Maybe we can avoid this call to `time.milliseconds` and use `ClientResponse.receivedTimeMs`? ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ## @@ -165,7 +176,9 @@ class BrokerToControllerRequestThread(networkClient: KafkaClient, } private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = { -if (response.wasDisconnected()) { +if (isTimedOut(request)) { Review comment: We check for timeouts only after receiving a response. I guess this means that in the worst case, the total timeout would be request.timeout*2. This is probably not a big deal, but maybe worth documenting in a comment somewhere. ## File path: core/src/main/scala/kafka/server/ForwardingManager.scala ## @@ -44,26 +46,34 @@ class ForwardingManager(channelManager: BrokerToControllerChannelManager) extend request.context.clientAddress.getAddress ) -def onClientResponse(clientResponse: ClientResponse): Unit = { - val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse] - val envelopeError = envelopeResponse.error() - val requestBody = request.body[AbstractRequest] +class ForwardingResponseHandler extends BrokerToControllerRequestCompletionHandler { + override def onComplete(clientResponse: ClientResponse): Unit = { +val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse] +val envelopeError = envelopeResponse.error() +val requestBody = request.body[AbstractRequest] - val response = if (envelopeError != Errors.NONE) { -// An envelope error indicates broker misconfiguration (e.g. the principal serde -// might not be defined on the receiving broker). In this case, we do not return -// the error directly to the client since it would not be expected. Instead we -// return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem -// on the broker. -debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError") -requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception) - } else { -parseResponse(envelopeResponse.responseData, requestBody, request.header) +val response = if (envelopeError != Errors.NONE) { + // An envelope error indicates broker misconfiguration (e.g. the principal serde + // might not be defined on the receiving broker). In this
[GitHub] [kafka] mjsax commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads
mjsax commented on a change in pull request #9614: URL: https://github.com/apache/kafka/pull/9614#discussion_r533020449 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() { } } +public final Sensor clientLevelSensor(final String sensorName, + final RecordingLevel recordingLevel, + final Sensor... parents) { +synchronized (clientLevelSensors) { +final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName; +final Sensor sensor = metrics.getSensor(fullSensorName); Review comment: Should we rewrite this the same way `threadLevelSensor` is written (ie, using `orElseGet`) for consistency? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java ## @@ -629,16 +657,18 @@ private void setupRemoveSensorsTest(final Metrics metrics, } @Test -public void shouldRemoveClientLevelMetrics() { +public void shouldRemoveClientLevelMetricsAndSensors() { final Metrics metrics = niceMock(Metrics.class); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time); -addSensorsOnAllLevels(metrics, streamsMetrics); +final Capture sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics); resetToDefault(metrics); -expect(metrics.removeMetric(metricName1)).andStubReturn(null); -expect(metrics.removeMetric(metricName2)).andStubReturn(null); -replay(metrics); -streamsMetrics.removeAllClientLevelMetrics(); +metrics.removeSensor(sensorKeys.getValues().get(0)); +metrics.removeSensor(sensorKeys.getValues().get(1)); + expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class)); + expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class)); Review comment: Why did we change this from `andStubReturn(null)` to `andReturn(mock(KafkaMetric.class))`? ## File path: streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java ## @@ -125,4 +131,16 @@ public static void addNumAliveStreamThreadMetric(final StreamsMetricsImpl stream stateProvider ); } +public static Sensor failedStreamThreadSensor(final StreamsMetricsImpl streamsMetrics) { Review comment: nit: missing empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
junrao commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r533002272 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -314,27 +315,23 @@ public KafkaProducer(Properties properties) { * be called in the producer when the serializer is passed in directly. */ public KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer) { -this(Utils.propsToMap(properties), keySerializer, valueSerializer, null, null, null, -Time.SYSTEM); +this(Utils.propsToMap(properties), keySerializer, valueSerializer); } // visible for testing @SuppressWarnings("unchecked") -KafkaProducer(Map configs, +KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) { -ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer, -valueSerializer)); try { -Map userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = time; -String transactionalId = (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG); +String transactionalId = (String) config.originals().get(ProducerConfig.TRANSACTIONAL_ID_CONFIG); Review comment: Could we just do `config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)` here? ## File path: clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java ## @@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() { // test configs with listener prefix Map configs = ChannelBuilders.channelBuilderConfigs(securityConfig, new ListenerName("listener1")); + assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), "testkafka"); + assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name")); + assertEquals(configs.get("sasl.kerberos.service.name"), "testkafkaglobal"); + assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name")); Review comment: Should gssapi.sasl.kerberos.service.name be sasl.kerberos.service.name? ## File path: clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java ## @@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() { // test configs with listener prefix Map configs = ChannelBuilders.channelBuilderConfigs(securityConfig, new ListenerName("listener1")); + assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name")); + assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), "testkafka"); + assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name")); + assertEquals(configs.get("sasl.kerberos.service.name"), "testkafkaglobal"); + assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name")); + assertNull(configs.get("listener.name.listener1.sasl.kerberos.service.name")); + assertFalse(securityConfig.unused().contains("listener.name.listener1.sasl.kerberos.service.name")); assertNull(configs.get("plain.sasl.server.callback.handler.class")); + assertFalse(securityConfig.unused().contains("plain.sasl.server.callback.handler.class")); + assertEquals(configs.get("listener.name.listener1.gssapi.config1.key"), "custom.config1"); + assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.config1.key")); + assertEquals(configs.get("custom.config2.key"), "custom.config2"); +assertFalse(securityConfig.unused().contains("custom.config2.key")); // test configs without listener prefix +securityConfig = new TestSecurityConfig(props); Review comment: Do we need to instantiate again? ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map originals, Map throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string."); this.originals = resolveConfigVariables(configProviderProps, (Map)
[jira] [Created] (KAFKA-10785) Rewrite ConfigEntityChangeNotificationSequenceZNode struct with auto-generated protocol
dengziming created KAFKA-10785: -- Summary: Rewrite ConfigEntityChangeNotificationSequenceZNode struct with auto-generated protocol Key: KAFKA-10785 URL: https://issues.apache.org/jira/browse/KAFKA-10785 Project: Kafka Issue Type: Sub-task Components: protocol Reporter: dengziming Assignee: dengziming -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10784) Rewrite ConfigEntityZNode struct with auto-generated protocol
dengziming created KAFKA-10784: -- Summary: Rewrite ConfigEntityZNode struct with auto-generated protocol Key: KAFKA-10784 URL: https://issues.apache.org/jira/browse/KAFKA-10784 Project: Kafka Issue Type: Sub-task Components: protocol Reporter: dengziming Assignee: dengziming -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10783) Rewrite TopicPartitionStateZNode struct with auto-generated protocol
dengziming created KAFKA-10783: -- Summary: Rewrite TopicPartitionStateZNode struct with auto-generated protocol Key: KAFKA-10783 URL: https://issues.apache.org/jira/browse/KAFKA-10783 Project: Kafka Issue Type: Sub-task Reporter: dengziming Assignee: dengziming -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10782) Rewrite TopicZNode struct with auto-generated protocol
dengziming created KAFKA-10782: -- Summary: Rewrite TopicZNode struct with auto-generated protocol Key: KAFKA-10782 URL: https://issues.apache.org/jira/browse/KAFKA-10782 Project: Kafka Issue Type: Sub-task Components: protocol Reporter: dengziming Assignee: dengziming -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10781) Rewrite BrokerIdZNode struct with auto-generated protocol
dengziming created KAFKA-10781: -- Summary: Rewrite BrokerIdZNode struct with auto-generated protocol Key: KAFKA-10781 URL: https://issues.apache.org/jira/browse/KAFKA-10781 Project: Kafka Issue Type: Sub-task Components: protocol Reporter: dengziming Assignee: dengziming -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10780) Rewrite ControllerZNode struct with auto-generated protocol
dengziming created KAFKA-10780: -- Summary: Rewrite ControllerZNode struct with auto-generated protocol Key: KAFKA-10780 URL: https://issues.apache.org/jira/browse/KAFKA-10780 Project: Kafka Issue Type: Sub-task Components: protocol Reporter: dengziming Assignee: dengziming User auto-generated protocol to rewrite zk controller node -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10779) Reassignment tool sets throttles incorrectly when overriding a reassignment
Jason Gustafson created KAFKA-10779: --- Summary: Reassignment tool sets throttles incorrectly when overriding a reassignment Key: KAFKA-10779 URL: https://issues.apache.org/jira/browse/KAFKA-10779 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson The logic in `ReassignPartitionsCommand.calculateProposedMoveMap` assumes that adding replicas are not included in the replica set returned from `Metadata` or `ListPartitionReassignments`. This is evident in the test case `ReassignPartitionsUnitTest.testMoveMap`. Because of this incorrect assumption, the move map is computed incorrectly which can result in the wrong throttles being applied. As far as I can tell, this is only an issue when overriding an existing reassignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate
mjsax commented on a change in pull request #9606: URL: https://github.com/apache/kafka/pull/9606#discussion_r532996901 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ## @@ -438,7 +439,8 @@ * query the value of the key on a parallel running instance of your Kafka Streams application. * * - * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore} -- regardless of what + * is specified in the parameter {@materialized}) will be backed by an internal changelog topic that will be created in Kafka. Review comment: No need to squash commits. We squash them automatically when merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate
mjsax commented on a change in pull request #9606: URL: https://github.com/apache/kafka/pull/9606#discussion_r532996684 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ## @@ -381,7 +381,8 @@ * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. * * - * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * For failure and recovery the store (which always will be of type {@link TimestampedKeyValueStore}) will be backed by + * an internal changelog topic that will be created in Kafka. Review comment: `reduce()` and `count()` are similar to `aggregate()`: reduce only does not allow you to change the value type (ie, output type == input type) and count, well implements an aggregate() that counts :) And they all work the same for `KGroupedStream`, `CogroupedKStream`, `TimeWindowedKStream` and `TimeWindowedCoGroupedKStream`. `table()` and `globalTable()` just read a topic and upsert the data into a state store / table. So it would be great to do them all in one PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9607: [KAFKA-10722] doc: Described the types of the stores used
mjsax commented on pull request #9607: URL: https://github.com/apache/kafka/pull/9607#issuecomment-736136722 Thanks for the PR @fml2! Sorry for the delay in reviewing. Sometimes it takes some time (good that you kept nagging :) ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9607: [KAFKA-10722] doc: Described the types of the stores used
mjsax merged pull request #9607: URL: https://github.com/apache/kafka/pull/9607 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9660: Kafka 10629 - TopologyTestDriver should not require a Properties argument
mjsax commented on a change in pull request #9660: URL: https://github.com/apache/kafka/pull/9660#discussion_r532983342 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -254,6 +265,19 @@ public TopologyTestDriver(final Topology topology, this(topology, config, null); } +/** + * Create a new test diver instance. + * Initialized the internally mocked wall-clock time with {@link System#currentTimeMillis() current system time}. Review comment: This overload takes `initialWallClockTimeMs`, thus this sentence seems to be incorrect? Should go to `TopologyTestDriver(final Topology topology)` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10629) TopologyTestDriver should not require a Properties arg
[ https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10629: Labels: kip newbie (was: needs-kip newbie) > TopologyTestDriver should not require a Properties arg > -- > > Key: KAFKA-10629 > URL: https://issues.apache.org/jira/browse/KAFKA-10629 > Project: Kafka > Issue Type: Task > Components: streams, streams-test-utils >Reporter: John Roesler >Assignee: Rohit Deshpande >Priority: Minor > Labels: kip, newbie > > As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver > usages will have no configurations at all to specify, so we should provide a > constructor that doesn't take a Properties argument. Right now, such > configuration-free usages have to provide an empty Properties object. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10629) TopologyTestDriver should not require a Properties arg
[ https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10629: Issue Type: Improvement (was: Task) > TopologyTestDriver should not require a Properties arg > -- > > Key: KAFKA-10629 > URL: https://issues.apache.org/jira/browse/KAFKA-10629 > Project: Kafka > Issue Type: Improvement > Components: streams, streams-test-utils >Reporter: John Roesler >Assignee: Rohit Deshpande >Priority: Minor > Labels: kip, newbie > > As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver > usages will have no configurations at all to specify, so we should provide a > constructor that doesn't take a Properties argument. Right now, such > configuration-free usages have to provide an empty Properties object. > KIP-680: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10629) TopologyTestDriver should not require a Properties arg
[ https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10629: Description: As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver usages will have no configurations at all to specify, so we should provide a constructor that doesn't take a Properties argument. Right now, such configuration-free usages have to provide an empty Properties object. KIP-680: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument] was:As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver usages will have no configurations at all to specify, so we should provide a constructor that doesn't take a Properties argument. Right now, such configuration-free usages have to provide an empty Properties object. > TopologyTestDriver should not require a Properties arg > -- > > Key: KAFKA-10629 > URL: https://issues.apache.org/jira/browse/KAFKA-10629 > Project: Kafka > Issue Type: Task > Components: streams, streams-test-utils >Reporter: John Roesler >Assignee: Rohit Deshpande >Priority: Minor > Labels: kip, newbie > > As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver > usages will have no configurations at all to specify, so we should provide a > constructor that doesn't take a Properties argument. Right now, such > configuration-free usages have to provide an empty Properties object. > KIP-680: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10629) TopologyTestDriver should not require a Properties arg
[ https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241146#comment-17241146 ] Matthias J. Sax commented on KAFKA-10629: - [~rohitdeshaws] – you got 3 binding votes already, from John, Guozhang, and myself. Note, that all committer votes are binding (even if not stated explicitly). You can find a list of all committer on the web page: [https://kafka.apache.org/committers] > TopologyTestDriver should not require a Properties arg > -- > > Key: KAFKA-10629 > URL: https://issues.apache.org/jira/browse/KAFKA-10629 > Project: Kafka > Issue Type: Task > Components: streams, streams-test-utils >Reporter: John Roesler >Assignee: Rohit Deshpande >Priority: Minor > Labels: needs-kip, newbie > > As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver > usages will have no configurations at all to specify, so we should provide a > constructor that doesn't take a Properties argument. Right now, such > configuration-free usages have to provide an empty Properties object. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241139#comment-17241139 ] Matthias J. Sax edited comment on KAFKA-7918 at 11/30/20, 11:51 PM: The existing caching layer also collapses writes into the changelog for the same key – but if you don't do the serialization, it's very hard to budget the used memory. It could make the system unstable and your JVM might crash with out-of-memory errors. We want to improve the system obviously and if you have a good solution, feel free to propose a KIP ([https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]). Getting rid of expensive (de)serialization cost could be huge win. My gut feeling is though, that without good memory management is would be hard to convince people; stability might be more important than performance. was (Author: mjsax): The existing caching layer also collapses writes into the changelog for the same key – but if you don't do the serialization, it's very hard to budget the used memory. It could make the system unstable and your JVM might crash with ouf-of-memory errors. > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241139#comment-17241139 ] Matthias J. Sax commented on KAFKA-7918: The existing caching layer also collapses writes into the changelog for the same key – but if you don't do the serialization, it's very hard to budget the used memory. It could make the system unstable and your JVM might crash with ouf-of-memory errors. > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas
junrao commented on a change in pull request #9631: URL: https://github.com/apache/kafka/pull/9631#discussion_r532963241 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -947,9 +947,10 @@ class Partition(val topicPartition: TopicPartition, leaderEndOffset: Long, currentTimeMs: Long, maxLagMs: Long): Boolean = { -val followerReplica = getReplicaOrException(replicaId) -followerReplica.logEndOffset != leaderEndOffset && - (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs +getReplica(replicaId).fold(true) { followerReplica => Review comment: @jsancio : Yes, we can keep the logic in the PR. On the leader, the logic for shrinking ISR is checked every 10 secs by default. So, in the common case when completing a reassignment, the reduced ISR by the controller will be propagated to the leader before the leader's ISR shrinking logic kicks in. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9663: MINOR: Small cleanups in `AlterIsr` handling logic
hachikuji opened a new pull request #9663: URL: https://github.com/apache/kafka/pull/9663 A few small cleanups in `Partition` handling of `AlterIsr`: - Factor state update and log message into `sendAlterIsrRequest` - Ensure illegal state error gets raised if a retry fails to be enqueued - Always check the proposed state against the current state in `handleAlterIsrResponse` - Add `toString` implementations to `IsrState` case classes ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Hamill updated KAFKA-2967: --- Labels: documentation (was: ) > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > Labels: documentation > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113 ] James Galasyn edited comment on KAFKA-2967 at 11/30/20, 10:49 PM: -- Since I inherited the Streams docs, I've come to appreciate the points made by [~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate the HTML-based docs to markdown and host them on ReadTheDocs. We used this approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], and there's no reason it won't work for the AK docs. Last week, I took a couple of hours and manually converted the entire AK doc set to markdown by using Pandoc: [ak-docs-proto repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to do, like fixing headings and updating links, but as a proof-of-concept, it works okay. Once the markdown is cleaned up, we would move it to the docs directory in the public AK GitHub repo and set up a ReadTheDocs project with a Basic account ($50/month). RTD will build from our GH branches and host the site for us. The ksdqlDB docs have used this model successfully since May 2020, so the execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you think? was (Author: jimgalasyn): Since I inherited the Streams docs, I've come to appreciate the points made by [~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate the HTML-based docs to markdown and host them on ReadTheDocs. We used this approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], and there's no reason it won't work for the AK docs. Last week, I took a couple of hours and manually converted the entire AK doc set to markdown by using Pandoc: [ak-docs-proto repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to do, like fixing headings and updating links, but as a proof-of-concept, it works okay. Once the markdown is cleaned up, we would move it to the docs directory in the public AK GitHub repo and set up a ReadTheDocs project with a Basic account ($50/month). RTD will build from our GH branches and host the site for us. Because it's time-consuming and error-prone to maintain the Kafka content both in the AK site and in the CP docs site, we would remove the duplicative content from the CP docs site, leaving only the CP-related content there, with links (and redirects) back to the AK docs. The ksdqlDB docs have used this model successfully since May 2020, so the execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you think? > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10702) Slow replication of empty transactions
[ https://issues.apache.org/jira/browse/KAFKA-10702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10702. - Fix Version/s: 2.8.0 Resolution: Fixed > Slow replication of empty transactions > -- > > Key: KAFKA-10702 > URL: https://issues.apache.org/jira/browse/KAFKA-10702 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.8.0 > > > We hit a case in which we had to re-replicate a compacted topic from the > beginning of the log. Some portions of the log consisted mostly of > transaction markers, which were extremely slow to replicate. The problem is > that `ProducerStateManager` adds all of these empty transactions to its > internal collection of `ongoingTxns` before immediately removing them. There > could be tens of thousands of empty transactions in the worst case from a > single `Fetch` response, so this can create a huge amount of pressure on the > broker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10702) Slow replication of empty transactions
[ https://issues.apache.org/jira/browse/KAFKA-10702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-10702: Issue Type: Improvement (was: Bug) > Slow replication of empty transactions > -- > > Key: KAFKA-10702 > URL: https://issues.apache.org/jira/browse/KAFKA-10702 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > We hit a case in which we had to re-replicate a compacted topic from the > beginning of the log. Some portions of the log consisted mostly of > transaction markers, which were extremely slow to replicate. The problem is > that `ProducerStateManager` adds all of these empty transactions to its > internal collection of `ongoingTxns` before immediately removing them. There > could be tens of thousands of empty transactions in the worst case from a > single `Fetch` response, so this can create a huge amount of pressure on the > broker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions
hachikuji merged pull request #9632: URL: https://github.com/apache/kafka/pull/9632 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113 ] James Galasyn edited comment on KAFKA-2967 at 11/30/20, 10:31 PM: -- Since I inherited the Streams docs, I've come to appreciate the points made by [~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate the HTML-based docs to markdown and host them on ReadTheDocs. We used this approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], and there's no reason it won't work for the AK docs. Last week, I took a couple of hours and manually converted the entire AK doc set to markdown by using Pandoc: [ak-docs-proto repo|[https://github.com/confluentinc/ak-docs-proto].] There's some more work to do, like fixing headings and updating links, but as a proof-of-concept, it works okay. Once the markdown is cleaned up, we would move it to the docs directory in the public AK GitHub repo and set up a ReadTheDocs project with a Basic account ($50/month). RTD will build from our GH branches and host the site for us. Because it's time-consuming and error-prone to maintain the Kafka content both in the AK site and in the CP docs site, we would remove the duplicative content from the CP docs site, leaving only the CP-related content there, with links (and redirects) back to the AK docs. The ksdqlDB docs have used this model successfully since May 2020, so the execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you think? was (Author: jimgalasyn): Since I inherited the Streams docs, I've come to appreciate the points made by [~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate the HTML-based docs to markdown and host them on ReadTheDocs. We used this approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], and there's no reason it won't work for the AK docs. Last week, I took a couple of hours and manually converted the entire AK doc set to markdown by using Pandoc: [ak-docs-proto repo|[https://github.com/confluentinc/ak-docs-proto|https://github.com/confluentinc/ak-docs-proto].]]. There's some more work to do, like fixing headings and updating links, but as a proof-of-concept, it works okay. Once the markdown is cleaned up, we would move it to the docs directory in the public AK GitHub repo and set up a ReadTheDocs project with a Basic account ($50/month). RTD will build from our GH branches and host the site for us. Because it's time-consuming and error-prone to maintain the Kafka content both in the AK site and in the CP docs site, we would remove the duplicative content from the CP docs site, leaving only the CP-related content there, with links (and redirects) back to the AK docs. The ksdqlDB docs have used this model successfully since May 2020, so the execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you think? > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113 ] James Galasyn edited comment on KAFKA-2967 at 11/30/20, 10:31 PM: -- Since I inherited the Streams docs, I've come to appreciate the points made by [~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate the HTML-based docs to markdown and host them on ReadTheDocs. We used this approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], and there's no reason it won't work for the AK docs. Last week, I took a couple of hours and manually converted the entire AK doc set to markdown by using Pandoc: [ak-docs-proto repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to do, like fixing headings and updating links, but as a proof-of-concept, it works okay. Once the markdown is cleaned up, we would move it to the docs directory in the public AK GitHub repo and set up a ReadTheDocs project with a Basic account ($50/month). RTD will build from our GH branches and host the site for us. Because it's time-consuming and error-prone to maintain the Kafka content both in the AK site and in the CP docs site, we would remove the duplicative content from the CP docs site, leaving only the CP-related content there, with links (and redirects) back to the AK docs. The ksdqlDB docs have used this model successfully since May 2020, so the execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you think? was (Author: jimgalasyn): Since I inherited the Streams docs, I've come to appreciate the points made by [~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate the HTML-based docs to markdown and host them on ReadTheDocs. We used this approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], and there's no reason it won't work for the AK docs. Last week, I took a couple of hours and manually converted the entire AK doc set to markdown by using Pandoc: [ak-docs-proto repo|[https://github.com/confluentinc/ak-docs-proto].] There's some more work to do, like fixing headings and updating links, but as a proof-of-concept, it works okay. Once the markdown is cleaned up, we would move it to the docs directory in the public AK GitHub repo and set up a ReadTheDocs project with a Basic account ($50/month). RTD will build from our GH branches and host the site for us. Because it's time-consuming and error-prone to maintain the Kafka content both in the AK site and in the CP docs site, we would remove the duplicative content from the CP docs site, leaving only the CP-related content there, with links (and redirects) back to the AK docs. The ksdqlDB docs have used this model successfully since May 2020, so the execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you think? > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113 ] James Galasyn commented on KAFKA-2967: -- Since I inherited the Streams docs, I've come to appreciate the points made by [~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate the HTML-based docs to markdown and host them on ReadTheDocs. We used this approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], and there's no reason it won't work for the AK docs. Last week, I took a couple of hours and manually converted the entire AK doc set to markdown by using Pandoc: [ak-docs-proto repo|[https://github.com/confluentinc/ak-docs-proto|https://github.com/confluentinc/ak-docs-proto].]]. There's some more work to do, like fixing headings and updating links, but as a proof-of-concept, it works okay. Once the markdown is cleaned up, we would move it to the docs directory in the public AK GitHub repo and set up a ReadTheDocs project with a Basic account ($50/month). RTD will build from our GH branches and host the site for us. Because it's time-consuming and error-prone to maintain the Kafka content both in the AK site and in the CP docs site, we would remove the duplicative content from the CP docs site, leaving only the CP-related content there, with links (and redirects) back to the AK docs. The ksdqlDB docs have used this model successfully since May 2020, so the execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you think? > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] twobeeb commented on pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
twobeeb commented on pull request #9589: URL: https://github.com/apache/kafka/pull/9589#issuecomment-736079387 @ryannedolan I have a preference for your first suggestion (the one for which there is currently a code proposition) because it doesn't alter the behavior of MM2 in any way for existing users. > IMO, we'd ideally skip creating the A->B herder whenever A->B.emit.heartbeats.enabled=false (defaults to true) and A->B.enabled=false (defaults to false). A top-level emit.heartbeats.enabled=false would then disable heartbeats altogether, which would trivially eliminate the extra herders. N.B. this would just be an optimization and wouldn't required a KIP, IMO. That being said, I'm also fine with your latest proposition. It fits perfectly to my personnal use case because topics will be replicated local to central as well as central to local in the near future ; all instanciated herders will then transmit data (and not just beats). Could you confirm and/or elaborate on the behavior you want to see when playing with ``emit.heartbeats.enabled`` ? I guess that'd be good test cases : **Simple A to B** A->B.enabled=true Expected : 2 herders **A to B and A to C** clusters=A, B, C A->B.enabled=true A->C.enabled=true Expected : 4 herders (AB BA AC CA) **Disabled heartbeats** clusters=A, B, C A->B.enabled=true A->B.emit.heartbeats.enabled=false B->A.emit.heartbeats.enabled=false A->C.enabled=true A->C.emit.heartbeats.enabled=false C->A.emit.heartbeats.enabled=false Expected : 4 herders (2 of which are unused, but still much better than N*N-1) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bdbyrne commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas
bdbyrne commented on a change in pull request #9628: URL: https://github.com/apache/kafka/pull/9628#discussion_r532909153 ## File path: clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java ## @@ -32,6 +32,7 @@ */ public static final String USER = "user"; public static final String CLIENT_ID = "client-id"; +public static final String IP = "IP"; Review comment: Consider making lower-case for consistency, but otherwise not a big issue, as I see that's how it was proposed in the KIP. ## File path: core/src/main/scala/kafka/server/AdminManager.scala ## @@ -807,23 +807,25 @@ class AdminManager(val config: KafkaConfig, case name => Sanitizer.desanitize(name) } - private def entityToSanitizedUserClientId(entity: ClientQuotaEntity): (Option[String], Option[String]) = { + private def entityToSanitizedUserClientId(entity: ClientQuotaEntity): (Option[String], Option[String], Option[String]) = { Review comment: May want to rename (generalize) since this now includes IP ## File path: core/src/main/scala/kafka/server/AdminManager.scala ## @@ -833,45 +835,76 @@ class AdminManager(val config: KafkaConfig, def describeClientQuotas(filter: ClientQuotaFilter): Map[ClientQuotaEntity, Map[String, Double]] = { var userComponent: Option[ClientQuotaFilterComponent] = None var clientIdComponent: Option[ClientQuotaFilterComponent] = None +var ipComponent: Option[ClientQuotaFilterComponent] = None filter.components.forEach { component => component.entityType match { case ClientQuotaEntity.USER => if (userComponent.isDefined) -throw new InvalidRequestException(s"Duplicate user filter component entity type"); +throw new InvalidRequestException(s"Duplicate user filter component entity type") userComponent = Some(component) case ClientQuotaEntity.CLIENT_ID => if (clientIdComponent.isDefined) -throw new InvalidRequestException(s"Duplicate client filter component entity type"); +throw new InvalidRequestException(s"Duplicate client filter component entity type") clientIdComponent = Some(component) +case ClientQuotaEntity.IP => + if (ipComponent.isDefined) +throw new InvalidRequestException(s"Duplicate ip filter component entity type") + ipComponent = Some(component) case "" => throw new InvalidRequestException(s"Unexpected empty filter component entity type") case et => // Supplying other entity types is not yet supported. throw new UnsupportedVersionException(s"Custom entity type '${et}' not supported") } } -handleDescribeClientQuotas(userComponent, clientIdComponent, filter.strict) +if ((userComponent.isDefined || clientIdComponent.isDefined) && ipComponent.isDefined) + throw new InvalidRequestException(s"Invalid entity filter component combination") Review comment: May want to provide a little more detail in the response ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1529,6 +1530,16 @@ object TestUtils extends Logging { adminClient.incrementalAlterConfigs(configs) } + def alterClientQuotas(adminClient: Admin, request: Map[ClientQuotaEntity, Map[String, Option[Double]]]): AlterClientQuotasResult = { +val entries = request.map { case (entity, alter) => + val ops = alter.map { case (key, value) => +new ClientQuotaAlteration.Op(key, value.map(Double.box).getOrElse(null)) + }.asJavaCollection +new ClientQuotaAlteration(entity, ops) Review comment: nit: indenting This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap
viktorsomogyi edited a comment on pull request #9519: URL: https://github.com/apache/kafka/pull/9519#issuecomment-736027069 @lbradstreet it is really hard to give an exact answer to this as collision rate is hard to calculate mathematically as it is very dependant on the size and values of the testset. For non-cryptographic hashes it is possible to generate DDoS attacks where everything gets placed into the same bucket and thus slows down lookups. On the theoretical side though Murmur3 passes the most often cited Chi Square test, it has a very good avalanche effect and thus generates a hashes that are very close to the uniform distribution. Because of the lack of available mathematical articles on this topic (murmur vs MD5) I started brute-force tests where I generated a few billion unique keys and inserted them into a Bloom Filter (which had a 1% false positive probability). That showed that Murmur3 is actually on the same level as MD5, it generates roughly the same amount of collisions. I have to types of datasets: the first can use any UTF8 characters and the second works only from the printable ASCII characters. In fact both MD5, murmur3 128bit, murmur3 64bit and xxhash64 bit generated around the same amount of collisions which was 0.016% out of 200 million unique keys. I added Murmur3 32bit for a baseline but it was significantly worse, around 2% of collisions. Maybe to show the difference we need a much larger keyset, I'll try to do what I can. I'll publish my code in the following days I just have to work on something else too so it's a bit slow, sorry :). On the other hand if we want to make sure that there will be no collisions, I don't think it's possible with either of these solutions, there is always a chance. To completely cut this off we either have to store the user key-hash maps similarly to the offset indexes and reject new, colliding keys or use perfect hashes (but that couldn't work well as it requires the knowledge of the full keyset or have to rebuild the cache in each insert or at least often). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9657: MINOR: Remove erroneous extra in design doc
bbejeck commented on pull request #9657: URL: https://github.com/apache/kafka/pull/9657#issuecomment-736032726 cherry-picked to 2.7 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d1egoaz closed pull request #9088: sync docs for list of emitted metrics by MirrorMetrics
d1egoaz closed pull request #9088: URL: https://github.com/apache/kafka/pull/9088 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9655: MINOR: fix listeners doc to close properly
bbejeck commented on pull request #9655: URL: https://github.com/apache/kafka/pull/9655#issuecomment-736030783 cherry-picked to 2.7 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9657: MINOR: Remove erroneous extra in design doc
bbejeck merged pull request #9657: URL: https://github.com/apache/kafka/pull/9657 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap
viktorsomogyi edited a comment on pull request #9519: URL: https://github.com/apache/kafka/pull/9519#issuecomment-736027069 @lbradstreet it is really hard to give an exact answer to this as collision rate is hard to calculate mathematically as it is very dependant on the size and values of the testset. For non-cryptographic hashes it is possible to generate DDoS attacks where everything gets placed into the same bucket and thus slows down lookups. On the theoretical side though Murmur3 passes the most often cited Chi Square test, it has a very good avalanche effect and thus generates a hashes that are very close to the uniform distribution. Because of the lack of available mathematical articles on this topic (murmur vs MD5) I started brute-force tests where I generated a few billion unique keys and inserted them into a Bloom Filter (which had a 1% false positive probability). That showed that Murmur3 is actually on the same level as MD5, it generates roughly the same amount of collisions. I have to types of datasets: the first can use any UTF8 characters and the second works only from the printable ASCII characters. In fact both MD5, murmur3 128bit, murmur3 64bit and xxhash64 bit generated around the same amount of collisions which was 0.016% out of 200 million unique keys. I added Murmur3 32bit for a baseline but it was significantly worse, around 2% of collisions. Maybe to show the difference we need a much larger keyset, I'll try to do what I can. I'll publish my code in the following days I just have to work on something else too so it's a bit slow, sorry :). On the other hand if we want to make sure that there will be no collisions, I don't think it's possible with either of these solutions, there is always a chance. To completely cut this off we either have to store the user key-hash maps similarly to the offset indexes and reject new, colliding keys or resolve the collision or use perfect hashes (but that couldn't work well as it requires the knowledge of the full keyset or have to rebuild the cache in each insert or at least often). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap
viktorsomogyi commented on pull request #9519: URL: https://github.com/apache/kafka/pull/9519#issuecomment-736027069 @lbradstreet it is really hard to give an exact answer to this as collision rate is hard to calculate mathematically as it is very dependant on the size and values of the testset. For non-cryptographic hashes it is possible to generate DDoS attacks where everything gets placed into the same bucket and thus slows down lookups. On the theoretical side though Murmur3 passes the most often cited Chi Square test, it has a very good avalanche effect and thus generates a hashes that are very close to the uniform distribution. Because of the lack of available mathematical articles on this topic (murmur vs MD5) I started brute-force tests where I generated a few billion unique keys and inserted them into a Bloom Filter (which had a 1% false positive probability). That showed that Murmur3 is actually on the same level as MD5, it generates roughly the same amount of collisions. I have to types of datasets: the first can use any UTF8 characters and the second works only from the printable ASCII characters. In fact both MD5, murmur3 128bit, murmur3 64bit and xxhash64 bit generated around the same amount of collisions which was 0.016% out of 200 million unique keys. I added Murmur3 32bit for a baseline but it was significantly worse, around 2% of collisions. Maybe to show the difference we need a much larger keyset, I'll try to do what I can. I'll publish my code in the following days I just have to work on something else too so it's a bit slow, sorry :). On the other hand if we want to make sure that there will be no collisions, I don't think it's possible with either of these solutions, there is always a chance. To completely cut this off we either have to store the user key-hash maps similarly to the offset indexes or use perfect hashes (but that couldn't work well as it requires the knowledge of the full keyset or have to rebuild the cache in each insert or at least often). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10778) Stronger log fencing after write failure
[ https://issues.apache.org/jira/browse/KAFKA-10778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-10778: Description: If a log append operation fails with an IO error, the broker attempts to fail the log dir that it resides in. Currently this is done asynchronously, which means there is no guarantee that additional appends won't be attempted before the log is fenced. This can be a problem for EOS because of the need to maintain consistent producer state. 1. Iterate through batches to build producer state and collect completed transactions 2. Append the batches to the log 3. Update the offset/timestamp indexes 4. Update log end offset 5. Apply individual producer state to `ProducerStateManager` 6. Update the transaction index 7. Update completed transactions and advance LSO One example of how this process can go wrong is if the index updates in step 3 fail. In this case, the log will contain updated producer state which has not been reflected in `ProducerStateManager`. If the append is retried before the log is fenced, then we can have duplicates. There are probably other potential failures that are possible as well. I'm sure we can come up with some way to fix this specific case, but the general fencing approach is slippery enough that we'll have a hard time convincing ourselves that it handles all potential cases. It would be simpler to add synchronous fencing logic for the case when an append fails due to an IO error. For example, we can mark a flag to indicate that the log is closed for additional read/write operations. was: If a log operation fails with an IO error, the broker attempts to fail the log dir that it resides in. Currently this is done asynchronously, which means there is no guarantee that additional appends won't be attempted before the log is fenced. This can be a problem for EOS because of the need to maintain consistent producer state. 1. Iterate through batches to build producer state and collect completed transactions 2. Append the batches to the log 3. Update the offset/timestamp indexes 4. Update log end offset 5. Apply individual producer state to `ProducerStateManager` 6. Update the transaction index 7. Update completed transactions and advance LSO One example of how this process can go wrong is if the index updates in step 3 fail. In this case, the log will contain updated producer state which has not been reflected in `ProducerStateManager`. If the append is retried before the log is fenced, then we can have duplicates. There are probably other potential failures that are possible as well. I'm sure we can come up with some way to fix this specific case, but the general fencing approach is slippery enough that we'll have a hard time convincing ourselves that it handles all potential cases. It would be simpler to add synchronous fencing logic for the case when an append fails due to an IO error. For example, we can mark a flag to indicate that the log is closed for additional read/write operations. > Stronger log fencing after write failure > > > Key: KAFKA-10778 > URL: https://issues.apache.org/jira/browse/KAFKA-10778 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > If a log append operation fails with an IO error, the broker attempts to fail > the log dir that it resides in. Currently this is done asynchronously, which > means there is no guarantee that additional appends won't be attempted before > the log is fenced. This can be a problem for EOS because of the need to > maintain consistent producer state. > 1. Iterate through batches to build producer state and collect completed > transactions > 2. Append the batches to the log > 3. Update the offset/timestamp indexes > 4. Update log end offset > 5. Apply individual producer state to `ProducerStateManager` > 6. Update the transaction index > 7. Update completed transactions and advance LSO > One example of how this process can go wrong is if the index updates in step > 3 fail. In this case, the log will contain updated producer state which has > not been reflected in `ProducerStateManager`. If the append is retried before > the log is fenced, then we can have duplicates. There are probably other > potential failures that are possible as well. > I'm sure we can come up with some way to fix this specific case, but the > general fencing approach is slippery enough that we'll have a hard time > convincing ourselves that it handles all potential cases. It would be simpler > to add synchronous fencing logic for the case when an append fails due to an > IO error. For example, we can mark a flag to indicate that the log is closed > for additional read/write operations. -- This message was sent by Atlassian Jira
[GitHub] [kafka] hachikuji commented on pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions
hachikuji commented on pull request #9632: URL: https://github.com/apache/kafka/pull/9632#issuecomment-736004238 Note I filed https://issues.apache.org/jira/browse/KAFKA-10778 to introduce synchronous log fencing after IO errors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10778) Stronger log fencing after write failure
Jason Gustafson created KAFKA-10778: --- Summary: Stronger log fencing after write failure Key: KAFKA-10778 URL: https://issues.apache.org/jira/browse/KAFKA-10778 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson If a log operation fails with an IO error, the broker attempts to fail the log dir that it resides in. Currently this is done asynchronously, which means there is no guarantee that additional appends won't be attempted before the log is fenced. This can be a problem for EOS because of the need to maintain consistent producer state. 1. Iterate through batches to build producer state and collect completed transactions 2. Append the batches to the log 3. Update the offset/timestamp indexes 4. Update log end offset 5. Apply individual producer state to `ProducerStateManager` 6. Update the transaction index 7. Update completed transactions and advance LSO One example of how this process can go wrong is if the index updates in step 3 fail. In this case, the log will contain updated producer state which has not been reflected in `ProducerStateManager`. If the append is retried before the log is fenced, then we can have duplicates. There are probably other potential failures that are possible as well. I'm sure we can come up with some way to fix this specific case, but the general fencing approach is slippery enough that we'll have a hard time convincing ourselves that it handles all potential cases. It would be simpler to add synchronous fencing logic for the case when an append fails due to an IO error. For example, we can mark a flag to indicate that the log is closed for additional read/write operations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions
hachikuji commented on pull request #9632: URL: https://github.com/apache/kafka/pull/9632#issuecomment-735990319 For a little more background about the `LogTest.testAppendToTransactionIndexFailure` failure, it is due to an inconsistency in how we update state in `ProducerStateManager`. The current append flow is the following: 1. Build producer state in `ProducerAppendInfo` instances and collect completed transactions 2. Append the entry to the log 3. Update log end offset 4. Apply individual producer state to `ProducerStateManager` 5. Update the transaction index 6. Update completed transactions and advance LSO The idea is that the LSO is stuck if an append to the transaction index fails. However, because we have already updated producer state before the index write, we are left with an inconsistency. The LSO will reflect an ongoing transaction which is not reflected in any of the producer states. The test case that is failing is validating the behavior when the index write fails. It works like this: 1. First append some transactional data to the log 2. Append an ABORT marker, but let the write to the transaction index fail 3. Retry the append of the ABORT and verify that append still fails and the LSO is stuck The test fails because the second append no longer attempts to write to the transaction index. I can change the test of course, but I was disturbed about the underlying assumption that the write of the transaction marker can be retried on the `Log` after a failure. In fact, the path to fencing the `Log` after a write failure is asynchronous today. We use `LogDirFailureChannel` to propagate log failures to a separate thread which is responsible for marking the log dir offline or shutting down the broker. So there is indeed a (small) window during which a `WriteTxnMarkers` request could be retried. My feeling is that EOS demands a stronger guarantee and we need to fence off the `Log` instance synchronously while still holding the lock. So I think we need a separate jira to fix this issue. The question then is whether it should block this patch or not. I am thinking not at the moment. The test fails because there is no second append to the transaction index, but this is not required for correctness, and the LSO remains stuck as expected in any case. Basically I'd say we're no worse than before. I will add a commit which alters the test case so that it can pass and we can discuss tightening up the failure logic in a separate jira. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (KAFKA-10758) Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a new topic
[ https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler closed KAFKA-10758. > Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a > new topic > --- > > Key: KAFKA-10758 > URL: https://issues.apache.org/jira/browse/KAFKA-10758 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Davide Icardi >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.7.0, 2.6.1 > > > I have a simple Kafka Stream app that consumes from multiple input topics > using the _stream_ function that accepts a Pattern > ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]). > > Whenever I add a new topic that matches the pattern the kafka stream state > goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN . > If I restart the app it correctly starts reading again without problems. > It is by design? Should I handle this and simply restart the app? > > Kafka Stream version is 2.6.0. > The error is the following: > {code:java} > ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match: > sourceNodesByName = [KSTREAM-SOURCE-03, KSTREAM-SOURCE-02] > sourceTopicsByName = [KSTREAM-SOURCE-00, KSTREAM-SOURCE-14, > KSTREAM-SOURCE-03, KSTREAM-SOURCE-02] > org.apache.kafka.common.KafkaException: User rebalance callback throws an > error > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > Caused by: java.lang.IllegalStateException: Tried to update source topics > but source nodes did not match > at > org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151) > at > org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109) > at > org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421) > ... 10 common frames omitted > KafkaStream state is ERROR > 17:28:53.200 [datalake-StreamThread-1] ERROR > o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream > threads have died. The instance will be in error state and should be closed. > > User rebalance callback throws an error > KafkaStream state is PENDING_SHUTDOWN > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (KAFKA-10754) Fix flaky shouldShutdownSingleThreadApplication test
[ https://issues.apache.org/jira/browse/KAFKA-10754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler closed KAFKA-10754. > Fix flaky shouldShutdownSingleThreadApplication test > > > Key: KAFKA-10754 > URL: https://issues.apache.org/jira/browse/KAFKA-10754 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 2.8.0 > > > org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownSingleThreadApplication > failed, log available in > /home/jenkins/jenkins-agent/workspace/Kafka/kafka-trunk-jdk11/streams/build/reports/testOutput/org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownSingleThreadApplication.test.stdout > org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest > > shouldShutdownSingleThreadApplication FAILED > java.lang.AssertionError: Expected all streams instances in > [org.apache.kafka.streams.KafkaStreams@36c1250, > org.apache.kafka.streams.KafkaStreams@124268b5] to be ERROR within 3 ms, > but the following were not: > \{org.apache.kafka.streams.KafkaStreams@124268b5=RUNNING, > org.apache.kafka.streams.KafkaStreams@36c1250=RUNNING} > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitForApplicationState$12(IntegrationTestUtils.java:933) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:450) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:418) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState(IntegrationTestUtils.java:916) > at > org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownSingleThreadApplication(StreamsUncaughtExceptionHandlerIntegrationTest.java:186) > > > [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk15/runs/267/log/?start=0] > [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk11/runs/241/log/?start=0] > [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk15/runs/270/log/?start=0] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10292: - Priority: Blocker (was: Major) > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.8.0 > > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > { > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > } > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > { > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > } > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10292: - Fix Version/s: 2.8.0 > fix flaky streams/streams_broker_bounce_test.py > --- > > Key: KAFKA-10292 > URL: https://issues.apache.org/jira/browse/KAFKA-10292 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.8.0 > > > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > { > "broker_type": "leader", > "failure_mode": "clean_bounce", > "num_threads": 1, > "sleep_time_secs": 120 > } > {quote} > {quote} > Module: kafkatest.tests.streams.streams_broker_bounce_test > Class: StreamsBrokerBounceTest > Method: test_broker_type_bounce > Arguments: > { > "broker_type": "controller", > "failure_mode": "hard_shutdown", > "num_threads": 3, > "sleep_time_secs": 120 > } > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10017: - Fix Version/s: (was: 2.6.0) 2.8.0 > Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-10017 > URL: https://issues.apache.org/jira/browse/KAFKA-10017 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Labels: flaky-test, unit-test > Fix For: 2.8.0 > > > Creating a new ticket for this since the root cause is different than > https://issues.apache.org/jira/browse/KAFKA-9966 > With injectError = true: > h3. Stacktrace > java.lang.AssertionError: Did not receive all 20 records from topic > multiPartitionOutputTopic within 6 ms Expected: is a value equal to or > greater than <20> but: <15> was less than <20> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10777) Add additional configuration to control MM2 internal topics naming convention
Omnia Ibrahim created KAFKA-10777: - Summary: Add additional configuration to control MM2 internal topics naming convention Key: KAFKA-10777 URL: https://issues.apache.org/jira/browse/KAFKA-10777 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.6.0 Reporter: Omnia Ibrahim MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are hardcoded in the source code which makes it hard to run MM2 with any Kafka Cluster that has rules around topic’s naming convention and doesn’t allow auto-creation for topics. In this case developers will need to create these internal topics up-front manually and make sure they do follow the cluster rules and guidance for topic creation, so MM2 should have flexibility to let you override the name of internal topics to follow their cluster topic naming convention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17240908#comment-17240908 ] Ming Liu commented on KAFKA-7918: - We also had the code to optimize the changelog supporting it. We only push to changelog at the certain interval (and collapse the data with same key to save the serialization cost). > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
[ https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-8266. Resolution: Fixed > Improve > `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup` > > > Key: KAFKA-8266 > URL: https://issues.apache.org/jira/browse/KAFKA-8266 > Project: Kafka > Issue Type: Test >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > > Some additional validation could be done after the member gets kicked out. > The main thing is showing that the group can continue to consume data and > commit offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
[ https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17240842#comment-17240842 ] David Jacot commented on KAFKA-8266: It seems that we haven't seen this one for a while now. I will close it. Please, re-open if necessary. > Improve > `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup` > > > Key: KAFKA-8266 > URL: https://issues.apache.org/jira/browse/KAFKA-8266 > Project: Kafka > Issue Type: Test >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > > Some additional validation could be done after the member gets kicked out. > The main thing is showing that the group can continue to consume data and > commit offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming commented on pull request #9662: KAFKA-10130; Rewrite FeatureZNode struct with auto-generated protocol
dengziming commented on pull request #9662: URL: https://github.com/apache/kafka/pull/9662#issuecomment-735765304 @abbccdda @kowshik hi, PTAL. also ping @cmccabe @hachikuji to have a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ankit-kumar-25 commented on pull request #9326: KAFKA-10460: ReplicaListValidator format checking is incomplete
ankit-kumar-25 commented on pull request #9326: URL: https://github.com/apache/kafka/pull/9326#issuecomment-735753885 Hey @mimaison, Thank you for your input. I have added a couple of test cases to validate this condition. For some reason the `Build Failed`. Can you please review the change and re-run the test/build if feasible? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9376) Plugin class loader not found using MM2
[ https://issues.apache.org/jira/browse/KAFKA-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17240706#comment-17240706 ] shezm commented on KAFKA-9376: -- Hi,I had the same problem too(release 2.4.0 with scala 2.12). I think the problem is here: {code:java} public class DelegatingClassLoader extends URLClassLoader { .. other code /** * Retrieve the PluginClassLoader associated with a plugin class * @param name The fully qualified class name of the plugin * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. */ public PluginClassLoader pluginClassLoader(String name) { if (!PluginUtils.shouldLoadInIsolation(name)) { return null; } SortedMap, ClassLoader> inner = pluginLoaders.get(name); if (inner == null) { return null; } //--- is here --- // I found `pluginLoader` type was `AppClassLoader` ClassLoader pluginLoader = inner.get(inner.lastKey()); return pluginLoader instanceof PluginClassLoader ? (PluginClassLoader) pluginLoader : null; } public ClassLoader connectorLoader(Connector connector) { return connectorLoader(connector.getClass().getName()); } public ClassLoader connectorLoader(String connectorClassOrAlias) { log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias); String fullName = aliases.containsKey(connectorClassOrAlias) ? aliases.get(connectorClassOrAlias) : connectorClassOrAlias; PluginClassLoader classLoader = pluginClassLoader(fullName); if (classLoader == null) { log.error( "Plugin class loader for connector: '{}' was not found. Returning: {}", connectorClassOrAlias, this ); return this; } return classLoader; } } {code} Looking at the code, I found that in the code of `DelegatingClassLoader#PluginClassLoader()`, the type of variable pluginLoader is AppClassLoader, which will result in returning null and output log.error(...), but no exception is thrown. But it doesn't seem to cause mm2 to be unavailable, so I think it might be a wrong log output. > Plugin class loader not found using MM2 > --- > > Key: KAFKA-9376 > URL: https://issues.apache.org/jira/browse/KAFKA-9376 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Sinóros-Szabó Péter >Priority: Minor > > I am using MM2 (release 2.4.0 with scala 2.12) I geta bunch of classloader > errors. MM2 seems to be working, but I do not know if all of it components > are working as expected as this is the first time I use MM2. > I run MM2 with the following command: > {code:java} > ./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties > {code} > Errors are: > {code:java} > [2020-01-07 15:06:17,892] ERROR Plugin class loader for connector: > 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. > Returning: > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) > [2020-01-07 15:06:17,889] ERROR Plugin class loader for connector: > 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. > Returning: > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165) > [2020-01-07 15:06:17,904] INFO ConnectorConfig values: > config.action.reload = restart > connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector > errors.log.enable = false > errors.log.include.messages = false > errors.retry.delay.max.ms = 6 > errors.retry.timeout = 0 > errors.tolerance = none > header.converter = null > key.converter = null > name = MirrorHeartbeatConnector > tasks.max = 1 > transforms = [] > value.converter = null > (org.apache.kafka.connect.runtime.ConnectorConfig:347) > [2020-01-07 15:06:17,904] INFO EnrichedConnectorConfig values: > config.action.reload = restart > connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector > errors.log.enable = false > errors.log.include.messages = false > errors.retry.delay.max.ms = 6 > errors.retry.timeout = 0 > errors.tolerance = none > header.converter = null > key.converter = null > name = MirrorHeartbeatConnector > tasks.max = 1 > transforms = [] > value.converter = null > > (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347) > [2020-01-07 15:06:17,905] INFO TaskConfig values: > task.class = class org.apache.kafka.connect.mirror.MirrorHeartbeatTask > (org.apache.kafka.connect.runtime.TaskConfig:347) >
[GitHub] [kafka] dengziming opened a new pull request #9662: Kafka-10130; Rewrite FeatureZNode struct with auto-generated protocol
dengziming opened a new pull request #9662: URL: https://github.com/apache/kafka/pull/9662 *More detailed description of your change* 1. remove FeatureZNode and replace it with FeatureZNode.json 2. Change code where FeatureZNode is used 3. copy some code of `org.apache.kafka.raft.FileBasedStateStore` to generate json data from and to FeatureZNodeData *Summary of testing strategy (including rationale)* Replace FeatureZNode in unit test and integration test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10736) Convert transaction coordinator metadata schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-10736. Fix Version/s: 2.8.0 Resolution: Fixed > Convert transaction coordinator metadata schemas to use generated protocol > -- > > Key: KAFKA-10736 > URL: https://issues.apache.org/jira/browse/KAFKA-10736 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.8.0 > > > We need to convert the internal schemas used for representing transaction > metadata to the generated protocol. This opens the door for flexible version > support on the next bump. > similar to https://issues.apache.org/jira/browse/KAFKA-10497 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #9611: KAFKA-10736 Convert transaction coordinator metadata schemas to use g…
chia7712 merged pull request #9611: URL: https://github.com/apache/kafka/pull/9611 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores
cadonna commented on pull request #9508: URL: https://github.com/apache/kafka/pull/9508#issuecomment-735665881 @vamossagar12 I replied to your comments. Let me know when the PR is ready for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10767) Add Unit Test cases for missing methods in ThreadCacheTest
[ https://issues.apache.org/jira/browse/KAFKA-10767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-10767: -- Labels: newbie (was: ) > Add Unit Test cases for missing methods in ThreadCacheTest > -- > > Key: KAFKA-10767 > URL: https://issues.apache.org/jira/browse/KAFKA-10767 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: newbie > > During the code review for KIP-614, it was noticed that some methods in > ThreadCache don't have unit tests. Need to identify them and add unit test > cases for them. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10767) Add Unit Test cases for missing methods in ThreadCacheTest
[ https://issues.apache.org/jira/browse/KAFKA-10767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-10767: -- Component/s: unit tests streams > Add Unit Test cases for missing methods in ThreadCacheTest > -- > > Key: KAFKA-10767 > URL: https://issues.apache.org/jira/browse/KAFKA-10767 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > During the code review for KIP-614, it was noticed that some methods in > ThreadCache don't have unit tests. Need to identify them and add unit test > cases for them. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores
cadonna commented on a change in pull request #9508: URL: https://github.com/apache/kafka/pull/9508#discussion_r532451217 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.rocksdb.RocksIterator; + +import java.nio.ByteBuffer; +import java.util.Set; + +class RocksDBPrefixIterator extends RocksDbIterator { +private final byte[] rawPrefix; + +RocksDBPrefixIterator(final String name, + final RocksIterator newIterator, + final Set> openIterators, + final Bytes prefix) { +super(name, newIterator, openIterators, true); +this.rawPrefix = prefix.get(); +newIterator.seek(rawPrefix); +} + +private boolean prefixEquals(final byte[] prefix1, final byte[] prefix2) { +final int min = Math.min(prefix1.length, prefix2.length); +final ByteBuffer prefix1Slice = ByteBuffer.wrap(prefix1, 0, min); +final ByteBuffer prefix2Slice = ByteBuffer.wrap(prefix2, 0, min); +return prefix1Slice.equals(prefix2Slice); Review comment: I see what you mean. But why do you bother then to find the minimum length of the two prefixes? You could just use the length of the first prefix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9630: KAFKA-10739; Replace EpochEndOffset with automated protocol
dajac commented on a change in pull request #9630: URL: https://github.com/apache/kafka/pull/9630#discussion_r53221 ## File path: core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala ## @@ -78,7 +80,19 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc callback.foreach(_.apply()) epochFetchCount += 1 lastUsedOffsetForLeaderEpochVersion = requestBuilder.latestAllowedVersion() -new OffsetsForLeaderEpochResponse(currentOffsets) + +val data = new OffsetForLeaderEpochResponseData() +currentOffsets.forEach((tp, offsetForLeaderPartition) => { + var topic = data.topics.find(tp.topic) + if (topic == null) { +topic = new OffsetForLeaderTopicResult() + .setTopic(tp.topic) +data.topics.add(topic) + } + topic.partitions.add(offsetForLeaderPartition.setPartition(tp.partition)) Review comment: Good point. Actually, `setPartition(tp.partition)` is not needed. Let me remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10766) Add Unit Test cases for RocksDbRangeIterator
[ https://issues.apache.org/jira/browse/KAFKA-10766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-10766: -- Component/s: unit tests streams > Add Unit Test cases for RocksDbRangeIterator > > > Key: KAFKA-10766 > URL: https://issues.apache.org/jira/browse/KAFKA-10766 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > During the code review for KIP-614, it was noticed that RocksDbRangeIterator > does not have any unit test cases. Here is the github comment for referrence: > [https://github.com/apache/kafka/pull/9508#discussion_r527612942] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10766) Add Unit Test cases for RocksDbRangeIterator
[ https://issues.apache.org/jira/browse/KAFKA-10766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-10766: -- Labels: newbie (was: ) > Add Unit Test cases for RocksDbRangeIterator > > > Key: KAFKA-10766 > URL: https://issues.apache.org/jira/browse/KAFKA-10766 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: newbie > > During the code review for KIP-614, it was noticed that RocksDbRangeIterator > does not have any unit test cases. Here is the github comment for referrence: > [https://github.com/apache/kafka/pull/9508#discussion_r527612942] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores
cadonna commented on a change in pull request #9508: URL: https://github.com/apache/kafka/pull/9508#discussion_r532440942 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java ## @@ -359,6 +361,31 @@ public void shouldReverseIterateOverRange() { ), results); } +@Test +public void shouldGetRecordsWithPrefixKey() { +final List> entries = new ArrayList<>(); +entries.add(new KeyValue<>(bytesKey("k1"), bytesValue("1"))); +entries.add(new KeyValue<>(bytesKey("k2"), bytesValue("2"))); +entries.add(new KeyValue<>(bytesKey("p2"), bytesValue("2"))); +entries.add(new KeyValue<>(bytesKey("p1"), bytesValue("2"))); +entries.add(new KeyValue<>(bytesKey("p0"), bytesValue("2"))); +store.putAll(entries); +final KeyValueIterator keysWithPrefix = store.prefixScan("p", new StringSerializer()); +final List keys = new ArrayList<>(); +final List values = new ArrayList<>(); +int numberOfKeysReturned = 0; + +while (keysWithPrefix.hasNext()) { +final KeyValue next = keysWithPrefix.next(); +keys.add(next.key.toString()); +values.add(new String(next.value)); +numberOfKeysReturned++; +} +assertThat(numberOfKeysReturned, is(3)); +assertThat(keys, is(Arrays.asList("p0", "p1", "p2"))); +assertThat(values, is(Arrays.asList("2", "2", "2"))); +} Review comment: Creating a ticket and take care that the ticket gets resolved, sound good to me. As I wote, my request is not a requirement for approval. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores
cadonna commented on a change in pull request #9508: URL: https://github.com/apache/kafka/pull/9508#discussion_r532438482 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -229,6 +230,15 @@ public V delete(final K key) { } } +@Override +public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { + +return new MeteredKeyValueIterator( +wrapped().prefixScan(prefix, prefixKeySerializer), +rangeSensor Review comment: I think, to be consistent we should add a new sensor for the prefix scan. We have a sensor for each operation on a key-value store. The only exception is `reverseRange()`, which is a variant of range and does not necessarily qualify for its own sensor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org