[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486257#comment-17486257 ] Ulrik commented on KAFKA-13638: --- I tried the following combination of versions: * streams 2.8.1 & rocksdb 6.0.1 : fast * streams 2.8.1 & rocksdb 6.1.1 : fast * streams 2.8.1 & rocksdb 6.2.2 : fast * streams 2.8.1 & rocksdb 6.3.6 : fast * streams 2.8.1 & rocksdb 6.4.6 : fast * streams 2.8.1 & rocksdb 6.5.2 : fast * streams 2.8.1 & rocksdb 6.6.4 : fast * streams 2.8.1 & rocksdb 6.7.3 : fast * streams 2.8.1 & rocksdb 6.11.6 : fast * streams 2.8.1 & rocksdb 6.12.7 : fast * streams 2.8.1 & rocksdb >= 6.13.3 : Runtime error: {code:java} org.apache.kafka.streams.errors.ProcessorStateException: Error opening store table at location /var/folders/_4/ks5l_9vj6zbfbpf5w6180t3j8kb4pd/T/kafka-streams/dummy-a36d66ed-ba41-4e6d-884f-3c2a652d245d/0_0/rocksdb/table at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:186) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:254) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:55) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:75) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:122) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:122) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216) at org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:537) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:385) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:313) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:272) at kafkatest.KafkaTest.validateTopologyCanProcessData(KafkaTest.java:79) at kafkatest.KafkaTest.multipleForwardsFromTransformer(KafkaTest.java:69) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at
[jira] [Created] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
Mohammad Yousuf Minhaj Zia created KAFKA-13641: -- Summary: Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters Key: KAFKA-13641 URL: https://issues.apache.org/jira/browse/KAFKA-13641 Project: Kafka Issue Type: Improvement Components: streams Reporter: Mohammad Yousuf Minhaj Zia Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be nullable, I am wondering if can wrap them around Scala `Option`. However, there is also the concern that the left hand side value can be null in the case of tombstone messages, in which the `Option` semantics can be misleading. I still feel this could be a useful feature in reducing the number of `NullPointerExceptions`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa edited a comment on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028142767 Hi @dongjinleekr ,I could able to build latest patch and also need one input from you. Is All dependencies of log4j 1.x is completely Removed in this Patch?, I could see,still dependency on log4j_1.2.17 in build.gradle and dependency.gradle.Also there are dependency on log4j.properties and tools-log4j.properties instead of log4j2.properties and tools-log4j2.properties in some of the files.Is it still require or we can remove those dependencies as well.?. The things I tried from my end is as follows, 1. I tried updating build.gradle and dependency.gradle by removing the dependency of log4j. 2. Also,i tried updating some of the files,where you have added echo statement to update log4j.properties into log4j2.properties in those places where u have mentioned in that patch file by removing log4j.properties and connect-log4j.properties and tools-log4j.properties file. 3. After that,i compiled the code and extracted folder under "C:\kafka_2.8.1\core\build\distributions\kafka_2.13-2.8.1\kafka_2.13-2.8.1" and named it as kafka.zip file and using in our component by installing and run it as kafka Service. 4.But when i tried running kafka,iam getting following exception. 2022-02-02 05:57:17.158 [INF] [Kafka] Connecting to localhost:2181 2022-02-02 05:57:27.571 [INF] [Kafka] WATCHER:: 2022-02-02 05:57:27.571 [INF] [Kafka] WatchedEvent state:SyncConnected type:None path:null 2022-02-02 05:57:27.574 [INF] [Kafka] [] 2022-02-02 05:58:17.227 [ERR] [Kafka] ERROR StatusLogger Reconfiguration failed: No configuration found for '764c12b6' at 'null' in 'null' 2022-02-02 05:58:17.684 [INF] [Kafka] DEPRECATED: using log4j 1.x configuration. To use log4j 2.x configuration, run with: 'set KAFKA_LOG4J_OPTS=-Dlog4j.configurationFile=file:C:\kafka/config/tools-log4j2.properties' To brief about my requirement is , Currently the kafka package we using,contains some of the patches which we have added on top of kafka_2.8.1 source code.In which one the custom change we have made is,we are using apache-log4j-extras 1.2.17 with timebased triggering policy for rolling log files as it is not available in log4j.1.2.17. Since this version has vulnerability ,we wanted to use that log4j2 api for this rolling policy logic which is working in your patch. Can you please help me on this...? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets
dengziming commented on a change in pull request #11726: URL: https://github.com/apache/kafka/pull/11726#discussion_r798205426 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2948,6 +2948,64 @@ public void testAssignorNameConflict() { () -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer())); } +@Test +public void testOffsetsForTimesTimeout() { +final KafkaConsumer consumer = consumerForCheckingTimeoutException(); +assertEquals( +"Failed to get offsets by times in 6ms", + assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage() +); +} + +@Test +public void testBeginningOffsetsTimeout() { +final KafkaConsumer consumer = consumerForCheckingTimeoutException(); +assertEquals( +"Failed to get offsets by times in 6ms", + assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.beginningOffsets(singletonList(tp0))).getMessage() +); +} + +@Test +public void testEndOffsetsTimeout() { +final KafkaConsumer consumer = consumerForCheckingTimeoutException(); +assertEquals( +"Failed to get offsets by times in 6ms", + assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.endOffsets(singletonList(tp0))).getMessage() +); +} + +private KafkaConsumer consumerForCheckingTimeoutException() { +final Time time = new MockTime(); +SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); +ConsumerMetadata metadata = createMetadata(subscription); +MockClient client = new MockClient(time, metadata); + +initMetadata(client, singletonMap(topic, 1)); +Node node = metadata.fetch().nodes().get(0); + +ConsumerPartitionAssignor assignor = new RangeAssignor(); + +final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); + +final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); +for (int i = 0; i < 10; i++) { +// Prepare a retriable error periodically for the client to retry connection +exec.schedule( +() -> client.prepareResponseFrom( +listOffsetsResponse( +Collections.emptyMap(), +Collections.singletonMap(tp0, Errors.UNKNOWN_TOPIC_OR_PARTITION) +), +node), 50L, TimeUnit.MILLISECONDS); +// Sleep periodically to make loop retry timeout +exec.schedule(() -> time.sleep(defaultApiTimeoutMs / 10), 50L, TimeUnit.MILLISECONDS); + +} Review comment: Yes, this is a good idea, and this make the test more deterministic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets
dengziming commented on a change in pull request #11726: URL: https://github.com/apache/kafka/pull/11726#discussion_r798204734 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2948,6 +2948,64 @@ public void testAssignorNameConflict() { () -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer())); } +@Test +public void testOffsetsForTimesTimeout() { +final KafkaConsumer consumer = consumerForCheckingTimeoutException(); +assertEquals( +"Failed to get offsets by times in 6ms", + assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage() Review comment: we already imported java.util.concurrent.TimeoutException so can't import org.apache.kafka.common.errors.TimeoutException here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stan-confluent commented on pull request #11730: Use ducktape version 0.7.17
stan-confluent commented on pull request #11730: URL: https://github.com/apache/kafka/pull/11730#issuecomment-102874 @cmccabe @ijuma @ewencp can you folks please take a look and/or suggest who else should be on the approvers list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stan-confluent opened a new pull request #11730: Use ducktape version 0.7.17
stan-confluent opened a new pull request #11730: URL: https://github.com/apache/kafka/pull/11730 Ducktape 0.7.17 locked a couple of dependencies to their last py2 compatible versions - this ensures more or less stable python 2 builds for kafkatest (until some other package breaks it). Since 2.7 and above uses ducktape 0.8.x and python 3, this change does not apply there. See these PRs for details on ducktape: https://github.com/confluentinc/ducktape/pull/291 https://github.com/confluentinc/ducktape/pull/292 Tested: - Ran `python setup.py develop` in a clean python 2.7.18 virtualenv to ensure all dependencies are installed without conflicts - Ran a small system test with `ducker-ak` just to ensure it still works ok - had to hack the dockerfile since openjdk:8 image does not include py2 anymore it seems? Hacked something together to have some test run (it failed due to some java deps it seems, but ducktape part worked ok) - Ran a system test job on jenkins - https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4776 - used a custom test that simply brings up zk and kafka ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
mjsax commented on pull request #11424: URL: https://github.com/apache/kafka/pull/11424#issuecomment-1028497889 I did not dig into the details myself. Anyway, might be better to discuss on the new PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486177#comment-17486177 ] Matthias J. Sax commented on KAFKA-13638: - Seems this does not explain much yet, as 5.18.4 did not work... Can you try using a 6.x version in 2.8? If it slows down, it would indicate that the RocksDB change is the root cause (what I suspect – cannot think of anything other change right now that could explain it). > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] cmccabe merged pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841
cmccabe merged pull request #11689: URL: https://github.com/apache/kafka/pull/11689 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841
cmccabe commented on pull request #11689: URL: https://github.com/apache/kafka/pull/11689#issuecomment-1028467009 test failures are not related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
artemlivshits commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r798094178 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: That does seem like a lot of instructions, some of them don't seem to be needed in this case where we deal with small unsigned integers (e.g. movsxd %r8d,%r8 is extending the sign to 64 bits, but we don't need it). Java doesn't seem to support unsigned, so not sure if there is a way to hint the compiler that we don't need instructions that only matter for signed arithmetic. Maybe using long would help at least to eliminate the need to do movsxd. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r798077165 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.util; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.utils.ByteUtils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ByteUtilsBenchmark { +private static final int INPUT_COUNT = 10_000; +private static final int MAX_INT = 2 * 1024 * 1024; + +private final int[] sizeOfInputs = new int[INPUT_COUNT]; + +@Setup(Level.Trial) +public void setUp() { +for (int i = 0; i < INPUT_COUNT; ++i) { +sizeOfInputs[i] = ThreadLocalRandom.current().nextInt(MAX_INT); +} +} + +@Benchmark +public long testSizeOfUnsignedVarint() { +long result = 0; +for (final int input : sizeOfInputs) { +result += ByteUtils.sizeOfUnsignedVarint(input); +} +return result; +} + +@Benchmark +public long testSizeOfUnsignedVarintOne() { +return ByteUtils.sizeOfUnsignedVarint(sizeOfInputs[0]); +} + +@Benchmark +public long testSizeOfUnsignedVarintMath() { +long result = 0; +for (final int input : sizeOfInputs) { + int leadingZeros = Integer.numberOfLeadingZeros(input); + result += (38 - leadingZeros) / 7 + leadingZeros / 32; +} Review comment: fixed in [d6aeeb1](https://github.com/apache/kafka/commit/d6aeeb1f034f8b75e3f608a940aa158e09497dcf) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11656: KAFKA-13579: upgrade netty/jetty/jackson to avoid vulnerability
ijuma merged pull request #11656: URL: https://github.com/apache/kafka/pull/11656 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13640) Implement final broker heartbeat in kraft
Colin McCabe created KAFKA-13640: Summary: Implement final broker heartbeat in kraft Key: KAFKA-13640 URL: https://issues.apache.org/jira/browse/KAFKA-13640 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe We should implement sending a final heartbeat from the broker when we're about to shut down. This would speed up the process of fencing the broker. As a note, this isn't a major concern when controlled shutdown is in use, since controlled shutdown should move leaders off the broker anyway. But not everyone uses controlled shutdown in all cases and this would be a nice improvement in the cases where it's not used. Actually, even in the controlled shutdown case this provides some limited benefit as a final "ack" that the broker is going down. It will remove the broker from the metadata provided to clients slightly quicker. We do need to avoid blocking too long on final heartbeat, though (we should not wait more than a second or two at most). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
ijuma commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r798019651 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.util; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.utils.ByteUtils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ByteUtilsBenchmark { +private static final int INPUT_COUNT = 10_000; +private static final int MAX_INT = 2 * 1024 * 1024; + +private final int[] sizeOfInputs = new int[INPUT_COUNT]; + +@Setup(Level.Trial) +public void setUp() { +for (int i = 0; i < INPUT_COUNT; ++i) { +sizeOfInputs[i] = ThreadLocalRandom.current().nextInt(MAX_INT); +} +} + +@Benchmark +public long testSizeOfUnsignedVarint() { +long result = 0; +for (final int input : sizeOfInputs) { +result += ByteUtils.sizeOfUnsignedVarint(input); +} +return result; +} + +@Benchmark +public long testSizeOfUnsignedVarintOne() { +return ByteUtils.sizeOfUnsignedVarint(sizeOfInputs[0]); +} + +@Benchmark +public long testSizeOfUnsignedVarintMath() { +long result = 0; +for (final int input : sizeOfInputs) { + int leadingZeros = Integer.numberOfLeadingZeros(input); + result += (38 - leadingZeros) / 7 + leadingZeros / 32; +} Review comment: Do we want to remove the benchmark methods that include loops and rely on the ones without loops? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486102#comment-17486102 ] Ulrik commented on KAFKA-13638: --- [~mjsax] I can confirm that using an in-memory store worked and the test ran very quickly. I also tried the following rocksdb-versions: * kafka-streams 3.1.0 with rocksdb 6.27.3 * kafka-streams 3.1.0 with rocksdb 6.22.1.1 * kafka-streams 3.1.0 with rocksdb 6.19.3 * kafka-streams 3.1.0 with rocksdb 5.18.4 All of the above rocksdb versions resulted in the same slow running test, except for rocksdb 5.18.4 which resulted in the following stacktrace: {code:java} java.lang.NoClassDefFoundError: org/rocksdb/MutableDBOptionsInterface at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:128) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:250) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:75) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:125) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:125) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:205) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:97) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:231) at org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:521) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:370) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:297) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:273) at kafkatest.KafkaTest.validateTopologyCanProcessData(KafkaTest.java:79) at kafkatest.KafkaTest.multipleForwardsFromTransformer(KafkaTest.java:69) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210) at
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r798002944 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: Yes, I agree - the arithmetic / bitshift looks intuitively quicker since it requires no loads. Looking at the compiler output from JDK11, it does get compiled down to a sequence of operations that are reasonably clean (no DIV), just that there are a lot of instructions. Perhaps there is a slight advantage in different use-cases, but, in my case this fix takes the function completely out of the profiler results so it solves the problem. Out of interest sake, here is the sequence of operations according to perfasm on JDK11. ``` ││ 0x7f26e04797b7: mov 0x10(%r12,%r10,8),%r10d 0.05% ││ 0x7f26e04797bc: lzcnt %r10d,%r11d ;*invokestatic numberOfLeadingZeros {reexecute=0 rethrow=0 return_oop=0} ││; - org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@6 (line 76) ││; - org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17 (line 119) 0.03% ││ 0x7f26e04797c1: mov $0x26,%r8d 0.01% ││ 0x7f26e04797c7: sub %r11d,%r8d;*isub {reexecute=0 rethrow=0 return_oop=0} ││; - org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@13 (line 77) ││; - org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17 (line 119) 0.49% ││ 0x7f26e04797ca: mov %r11d,%r9d 0.36% ││ 0x7f26e04797cd: sar $0x1f,%r9d 0.02% ││ 0x7f26e04797d1: movsxd %r8d,%r10 0.77% ││ 0x7f26e04797d4: shr $0x1b,%r9d 7.24% ││ 0x7f26e04797d8: add %r11d,%r9d 0.78% ││ 0x7f26e04797db: imulq $0x92492493,%r10,%r10 0.05% ││ 0x7f26e04797e2: sar $0x5,%r9d 2.24% ││ 0x7f26e04797e6: sar $0x20,%r10 9.93% ││ 0x7f26e04797ea: movsxd %r9d,%r11 0.12% ││ 0x7f26e04797ed: mov %r10d,%r10d ││ 0x7f26e04797f0: add %r8d,%r10d 6.30% ││ 0x7f26e04797f3: sar $0x1f,%r8d 2.53% ││ 0x7f26e04797f7: sar $0x2,%r10d 6.21% ││ 0x7f26e04797fb: movsxd %r8d,%r8 ││ 0x7f26e04797fe: movsxd %r10d,%rdx 7.25% ││ 0x7f26e0479801: sub %r8,%rdx 7.01% ││ 0x7f26e0479804: add %r11,%rdx ;*i2l {reexecute=0 rethrow=0 return_oop=0} ││; - org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@22 (line 77) ││; - org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17 (line 119) 8.11% ││ 0x7f26e0479807: mov (%rsp),%rsi ``` Lookup table: ``` │ 0x7fde904792a7: mov 0x10(%r12,%r10,8),%r10d 1.42% │ 0x7fde904792ac: lzcnt %r10d,%r10d
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r798002944 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: Yes, I agree - the arithmetic / bitshift looks intuitively quicker since it requires no loads. Looking at the compiler output from JDK11, it does get compiled down to a sequence of operations that are reasonably clean (no DIV), just that there are a lot of instructions. Perhaps there is a slight advantage in different use-cases, but, in my case this fix takes the function completely out of the profiler results so it solves the problem. Out of interest sake, here is the sequence of operations according to perfasm on JDK11. ``` ││ 0x7f26e04797b7: mov 0x10(%r12,%r10,8),%r10d 0.05% ││ 0x7f26e04797bc: lzcnt %r10d,%r11d ;*invokestatic numberOfLeadingZeros {reexecute=0 rethrow=0 return_oop=0} ││; - org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@6 (line 76) ││; - org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17 (line 119) 0.03% ││ 0x7f26e04797c1: mov $0x26,%r8d 0.01% ││ 0x7f26e04797c7: sub %r11d,%r8d;*isub {reexecute=0 rethrow=0 return_oop=0} ││; - org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@13 (line 77) ││; - org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17 (line 119) 0.49% ││ 0x7f26e04797ca: mov %r11d,%r9d 0.36% ││ 0x7f26e04797cd: sar $0x1f,%r9d 0.02% ││ 0x7f26e04797d1: movsxd %r8d,%r10 0.77% ││ 0x7f26e04797d4: shr $0x1b,%r9d 7.24% ││ 0x7f26e04797d8: add %r11d,%r9d 0.78% ││ 0x7f26e04797db: imulq $0x92492493,%r10,%r10 0.05% ││ 0x7f26e04797e2: sar $0x5,%r9d 2.24% ││ 0x7f26e04797e6: sar $0x20,%r10 9.93% ││ 0x7f26e04797ea: movsxd %r9d,%r11 0.12% ││ 0x7f26e04797ed: mov %r10d,%r10d ││ 0x7f26e04797f0: add %r8d,%r10d 6.30% ││ 0x7f26e04797f3: sar $0x1f,%r8d 2.53% ││ 0x7f26e04797f7: sar $0x2,%r10d 6.21% ││ 0x7f26e04797fb: movsxd %r8d,%r8 ││ 0x7f26e04797fe: movsxd %r10d,%rdx 7.25% ││ 0x7f26e0479801: sub %r8,%rdx 7.01% ││ 0x7f26e0479804: add %r11,%rdx ;*i2l {reexecute=0 rethrow=0 return_oop=0} ││; - org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@22 (line 77) ││; - org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17 (line 119) 8.11% ││ 0x7f26e0479807: mov (%rsp),%rsi ``` Lookup table: ``` │ 0x7fde904792a7: mov 0x10(%r12,%r10,8),%r10d 1.42% │ 0x7fde904792ac: lzcnt %r10d,%r10d
[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r798002944 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: Yes, I agree - the arithmetic / bitshift looks intuitively quicker since it requires no loads. Looking at the compiler output from JDK11, it does get compiled down to a sequence of operations that are reasonably clean. Perhaps there is a slight advantage in different use-cases, but, in my case this fix takes the function completely out of the profiler results so it solves the problem. Out of interest sake, here is the sequence of operations according to perfasm on JDK11. ``` ││ 0x7f26e04797b7: mov 0x10(%r12,%r10,8),%r10d 0.05% ││ 0x7f26e04797bc: lzcnt %r10d,%r11d ;*invokestatic numberOfLeadingZeros {reexecute=0 rethrow=0 return_oop=0} ││; - org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@6 (line 76) ││; - org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17 (line 119) 0.03% ││ 0x7f26e04797c1: mov $0x26,%r8d 0.01% ││ 0x7f26e04797c7: sub %r11d,%r8d;*isub {reexecute=0 rethrow=0 return_oop=0} ││; - org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@13 (line 77) ││; - org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17 (line 119) 0.49% ││ 0x7f26e04797ca: mov %r11d,%r9d 0.36% ││ 0x7f26e04797cd: sar $0x1f,%r9d 0.02% ││ 0x7f26e04797d1: movsxd %r8d,%r10 0.77% ││ 0x7f26e04797d4: shr $0x1b,%r9d 7.24% ││ 0x7f26e04797d8: add %r11d,%r9d 0.78% ││ 0x7f26e04797db: imulq $0x92492493,%r10,%r10 0.05% ││ 0x7f26e04797e2: sar $0x5,%r9d 2.24% ││ 0x7f26e04797e6: sar $0x20,%r10 9.93% ││ 0x7f26e04797ea: movsxd %r9d,%r11 0.12% ││ 0x7f26e04797ed: mov %r10d,%r10d ││ 0x7f26e04797f0: add %r8d,%r10d 6.30% ││ 0x7f26e04797f3: sar $0x1f,%r8d 2.53% ││ 0x7f26e04797f7: sar $0x2,%r10d 6.21% ││ 0x7f26e04797fb: movsxd %r8d,%r8 ││ 0x7f26e04797fe: movsxd %r10d,%rdx 7.25% ││ 0x7f26e0479801: sub %r8,%rdx 7.01% ││ 0x7f26e0479804: add %r11,%rdx ;*i2l {reexecute=0 rethrow=0 return_oop=0} ││; - org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@22 (line 77) ││; - org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17 (line 119) 8.11% ││ 0x7f26e0479807: mov (%rsp),%rsi ``` Lookup table: ``` │ 0x7fde904792a7: mov 0x10(%r12,%r10,8),%r10d 1.42% │ 0x7fde904792ac: lzcnt %r10d,%r10d ;*invokestatic numberOfLeadingZeros {reexecute=0
[GitHub] [kafka] artemlivshits commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
artemlivshits commented on a change in pull request #11721: URL: https://github.com/apache/kafka/pull/11721#discussion_r797932888 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ## @@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer buffer) { buffer.putDouble(value); } +final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] { +// 32 bits, and each 7-bits adds one byte to the output +5, 5, 5, 5, // 32 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + +final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] { +// 64 bits, and each 7-bits adds one byte to the output +10, // 64 +9, 9, 9, 9, 9, 9, 9, // 63 +8, 8, 8, 8, 8, 8, 8, // 56 +7, 7, 7, 7, 7, 7, 7, // 49 +6, 6, 6, 6, 6, 6, 6, // 42 +5, 5, 5, 5, 5, 5, 5, // 35 +4, 4, 4, 4, 4, 4, 4, // 28 +3, 3, 3, 3, 3, 3, 3, // 21 +2, 2, 2, 2, 2, 2, 2, // 14 +1, 1, 1, 1, 1, 1, 1, // 7 +1 // 0 +}; + /** * Number of bytes needed to encode an integer in unsigned variable-length format. * * @param value The signed value */ public static int sizeOfUnsignedVarint(int value) { -int bytes = 1; -while ((value & 0xff80) != 0L) { -bytes += 1; -value >>>= 7; -} -return bytes; +int leadingZeros = Integer.numberOfLeadingZeros(value); +return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros]; Review comment: Interesting. It's unfortunate that the second operation is a div in Java (I'm new to Java, so maybe there is a way to express it differently, but I couldn't think of one), in C++ I would've just done: ``` return (38 - leadingZeros) / 7 + (leadingZeros == 32); ``` which would just do cmp and add carryover bit, which is cheaper than div. Maybe the compiler would be smart enough to translate ((leadingZeros == 32) ? 1 : 0) into math expression rather than do a branch? In these benchmarks, the lookup table is likely cached in L1 cache (hot loop that hits the same small amount of data), so memory access in the benchmark is likely cheaper than on average. It's probably hard to do a proper model and I'm not sure if it's worth it. In any case, thanks for doing this comprehensive research, good stuff! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13221) Add metric for `PartitionsWithLateTransactionsCount`
[ https://issues.apache.org/jira/browse/KAFKA-13221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13221. - Fix Version/s: 3.2.0 Resolution: Fixed > Add metric for `PartitionsWithLateTransactionsCount` > > > Key: KAFKA-13221 > URL: https://issues.apache.org/jira/browse/KAFKA-13221 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.2.0 > > > The metric `PartitionsWithLateTransactionsCount` was introduced in KIP-664: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-Metrics. > This metric will record the number of partitions which have open > transactions with durations exceeding `transaction.max.timeout.ms`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] hachikuji merged pull request #11725: KAFKA-13221; Implement `PartitionsWithLateTransactionsCount` metric
hachikuji merged pull request #11725: URL: https://github.com/apache/kafka/pull/11725 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
kirktrue commented on pull request #11627: URL: https://github.com/apache/kafka/pull/11627#issuecomment-1028296460 This looks pretty straightforward to me, though I'm still learning the consumer client piece. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #11729: MINOR: fix control plane listener + kraft error message
cmccabe opened a new pull request #11729: URL: https://github.com/apache/kafka/pull/11729 The current error message suggests that controller.listener.names is a replacement for control.plane.listener.name. This is incorrect since these configurations have very different functions. This PR deletes the incorrect message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset
[ https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486005#comment-17486005 ] Sergey Ivanov edited comment on KAFKA-13639 at 2/2/22, 7:04 PM: We found mail thread which describe the similar issue: [https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] and the same in [https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume] when __consumer_offsets topic's partition was corrupted due to shutdown Kafka cluster (with working clients). They said that issue was resolved when Kafka compacts topics segments (with our retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this process and set the following properties for the topic: {code:java} ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic __consumer_offsets --config retention.ms=1 {code} Then we restarted Kafka cluster, and the issue gone, partition 36 has been replicated. (looks like WA, but difficult and strange). We faced this issue at lease one more time and it doesn't go without our manually retention changes. So looks like this is not random +bug.+ Can anyone help with investigation? We can provide more logs if necessary. was (Author: mrmigles): We found mail thread which describe the similar issue: [https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] and the same in [https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume] when __consumer_offsets topic's partition was corrupted due to shutdown Kafka cluster (with working clients). They said that issue was resolved when Kafka compacts topics segments (with our retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this process and set the following properties for the topic: {code:java} ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic __consumer_offsets --config retention.ms=1 {code} Then we restarted Kafka cluster, and the issue gone, partition 36 has been replicated. We faced this issue at lease one more time and it doesn't go without our manually retention changes. So looks like this is not random +bug.+ Can anyone help with investigation? We can provide more logs if necessary. > NotEnoughReplicasException for __consumer_offsets topic due to out of order > offset > -- > > Key: KAFKA-13639 > URL: https://issues.apache.org/jira/browse/KAFKA-13639 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 2.6.2 >Reporter: Sergey Ivanov >Priority: Major > > Hello, > We faced a strange issue with Kafka during testing failover scenarios: this > assumes forces shutdown nodes where Kafka pods are placed (Kafka is deployed > to Kubernetes), and then return these nodes. > After this Kafka pods are started normally but +some+ consumers could not > connect to it with errors: > > {code:java} > [2022-01-27T14:35:09.051][level=DEBUG][class=kafka_client:utils.go:120]: > Failed to sync group mae_processor: [15] Group Coordinator Not Available: the > broker returns this error code for group coordinator requests, offset > commits, and most group management requests if the offsets topic has not yet > been created, or if the group coordinator is not active{code} > > > It looked like there were issues with ___consumer_offsets_ topic. In logs of > brokers we found this error: > {code:java} > [2022-01-27T14:56:00,233][INFO][category=kafka.coordinator.group.GroupCoordinator] > [GroupCoordinator 1]: Group mae_processor with generation 329 is now empty > (__consumer_offsets-36) > [2022-01-27T14:56:00,233][ERROR][category=kafka.server.ReplicaManager] > [ReplicaManager broker=1] Error processing append operation on partition > __consumer_offsets-36 > org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(1) is insufficient to satisfy the min.isr requirement of 2 > for partition __consumer_offsets-36 > [2022-01-27T14:56:00,233][WARN][category=kafka.coordinator.group.GroupCoordinator] > [GroupCoordinator 1]: Failed to write empty metadata for group > mae_processor: The coordinator is not available. > {code} > If we check partitions of __consumer_offsets it really has one partition with > insufficient ISR: > {code:java} > topic "__consumer_offsets" with 50 partitions: > partition 0, leader 1, replicas: 1,3,2, isrs: 1,2,3 > ... > partition 35, leader 3, replicas: 3,1,2, isrs: 1,2,3 > partition 36, leader 1, replicas: 1,3,2, isrs: 1 > partition 37, leader 2, replicas: 2,1,3, isrs: 1,2,3 > > partition 49, leader 2, replicas: 2,1,3,
[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on pull request #11424: URL: https://github.com/apache/kafka/pull/11424#issuecomment-1028264508 @ableegoldman @mjsax My read on the code is that we only need to change the TopologyTestDriver, while the first place seems fine to me. Did I miss anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] azhur closed pull request #11728: Cherrypick KAFKA-4090: Validate SSL connection in client to 3.1.x
azhur closed pull request #11728: URL: https://github.com/apache/kafka/pull/11728 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12494) Broker raise InternalError after disk sector medium error without marking dir to offline
[ https://issues.apache.org/jira/browse/KAFKA-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486011#comment-17486011 ] Sergey Ivanov commented on KAFKA-12494: --- Hi all, We faced the similar issue on our env when testing failover scenarios: we force shutdown Kafka brokers during working, and after restarting and some loading they started to raise a lot of errors with "{_}java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code"{_} For example: {code:java} java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at kafka.server.FullFetchContext.$anonfun$updateAndGenerateResponseData$3(FetchSession.scala:373) at java.base/java.util.LinkedHashMap.forEach(Unknown Source) at kafka.server.FullFetchContext.createNewSession$1(FetchSession.scala:372) {code} {code:java} java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.base/java.util.LinkedHashMap.newNode(Unknown Source) at java.base/java.util.HashMap.putVal(Unknown Source) at java.base/java.util.HashMap.put(Unknown Source) {code} {code:java} java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at kafka.server.LogReadResult.error(ReplicaManager.scala:104) at kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1621) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) {code} And many of others stacktraces. Looks like java.lang.InternalError is not a root cause of issue, but in logs we couldn't find any other errors. After Kafka broker started to raise this error it became not fully operated: it can't handle requests of clients and other broker. The issue has gone after restart (looks like WA), but can't image what is the +root cause+ of this issue. Please correct me if this is not right ticket for this issue. > Broker raise InternalError after disk sector medium error without marking dir > to offline > > > Key: KAFKA-12494 > URL: https://issues.apache.org/jira/browse/KAFKA-12494 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.0, 2.6.0, 2.5.1, 2.7.0 > Environment: Kafka Version: 1.1.0 > Jdk Version: jdk1.8 >Reporter: iBlackeyes >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > In my produce env, we encounter a case that kafka broker only raise errors > like > `_*2021-02-16 23:24:24,965 | ERROR | [data-plane-kafka-request-handler-19] | > [ReplicaManager broker=7] Error processing append operation on partition > xxx-0 | kafka.server.ReplicaManager (Logging.scala:76)*_ > _*java.lang.InternalError: a fault occurred in a recent unsafe memory access > operation in compiled Java code*_` > when broker append to a error disk sector and doesn't mark the dir on this > disk to offline. > This result in many partitions which assign replicas on this disk in > under-replicated state . > Here is the logs: > *os messages log:* > {code:java} > Feb 16 23:24:24 hd-node109 kernel: blk_update_request: critical medium error, > dev sds, sector 2308010408 > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] FAILED Result: > hostbyte=DID_OK driverbyte=DRIVER_SENSE > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Sense Key : Medium > Error [current] > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Add. Sense: > Unrecovered read error > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] CDB: Read(10) 28 00 89 > 91 71 a8 00 00 08 00 > Feb 16 23:24:24 hd-node109 kernel: blk_update_request: critical medium error, > dev sds, sector 2308010408 > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] FAILED Result: > hostbyte=DID_OK driverbyte=DRIVER_SENSE > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Sense Key : Medium > Error [current] > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Add. Sense: > Unrecovered read error > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] CDB: Read(10) 28 00 89 > 91 71 a8 00 00 08 00 > Feb 16 23:24:24 hd-node109 kernel: blk_update_request: critical medium error, > dev sds, sector 2308010408 > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] FAILED Result: > hostbyte=DID_OK driverbyte=DRIVER_SENSE > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Sense Key : Medium > Error [current] > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Add. Sense: > Unrecovered read error > Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] CDB: Read(10) 28 00 89 > 91 71 a8 00 00 08 00 > Feb 16
[jira] [Comment Edited] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset
[ https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486005#comment-17486005 ] Sergey Ivanov edited comment on KAFKA-13639 at 2/2/22, 6:31 PM: We found mail thread which describe the similar issue: [https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] and the same in [https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume] when __consumer_offsets topic's partition was corrupted due to shutdown Kafka cluster (with working clients). They said that issue was resolved when Kafka compacts topics segments (with our retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this process and set the following properties for the topic: {code:java} ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic __consumer_offsets --config retention.ms=1 {code} Then we restarted Kafka cluster, and the issue gone, partition 36 has been replicated. We faced this issue at lease one more time and it doesn't go without our manually retention changes. So looks like this is not random +bug.+ Can anyone help with investigation? We can provide more logs if necessary. was (Author: mrmigles): We found mail thread which describe the similar issue: [https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] and the same in [https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume] when __consumer_offsets topic's partition was corrupted due to shutdown Kafka cluster (with working clients). They said that issue was resolved when Kafka compacts topics segments (with our retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this process and set the following properties for the topic: {code:java} ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic __consumer_offsets --config retention.ms=1 {code} Then we restarted Kafka cluster, and {*}the issue gone{*}, partition 36 has been replicated. We faced this issue at lease one more time and it doesn't go without our manually retention changes. So looks like this is not random bug. Can anyone help with investigation? We can provide more logs if necessary. > NotEnoughReplicasException for __consumer_offsets topic due to out of order > offset > -- > > Key: KAFKA-13639 > URL: https://issues.apache.org/jira/browse/KAFKA-13639 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 2.6.2 >Reporter: Sergey Ivanov >Priority: Major > > Hello, > We faced a strange issue with Kafka during testing failover scenarios: this > assumes forces shutdown nodes where Kafka pods are placed (Kafka is deployed > to Kubernetes), and then return these nodes. > After this Kafka pods are started normally but +some+ consumers could not > connect to it with errors: > > {code:java} > [2022-01-27T14:35:09.051][level=DEBUG][class=kafka_client:utils.go:120]: > Failed to sync group mae_processor: [15] Group Coordinator Not Available: the > broker returns this error code for group coordinator requests, offset > commits, and most group management requests if the offsets topic has not yet > been created, or if the group coordinator is not active{code} > > > It looked like there were issues with ___consumer_offsets_ topic. In logs of > brokers we found this error: > {code:java} > [2022-01-27T14:56:00,233][INFO][category=kafka.coordinator.group.GroupCoordinator] > [GroupCoordinator 1]: Group mae_processor with generation 329 is now empty > (__consumer_offsets-36) > [2022-01-27T14:56:00,233][ERROR][category=kafka.server.ReplicaManager] > [ReplicaManager broker=1] Error processing append operation on partition > __consumer_offsets-36 > org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(1) is insufficient to satisfy the min.isr requirement of 2 > for partition __consumer_offsets-36 > [2022-01-27T14:56:00,233][WARN][category=kafka.coordinator.group.GroupCoordinator] > [GroupCoordinator 1]: Failed to write empty metadata for group > mae_processor: The coordinator is not available. > {code} > If we check partitions of __consumer_offsets it really has one partition with > insufficient ISR: > {code:java} > topic "__consumer_offsets" with 50 partitions: > partition 0, leader 1, replicas: 1,3,2, isrs: 1,2,3 > ... > partition 35, leader 3, replicas: 3,1,2, isrs: 1,2,3 > partition 36, leader 1, replicas: 1,3,2, isrs: 1 > partition 37, leader 2, replicas: 2,1,3, isrs: 1,2,3 > > partition 49, leader 2, replicas: 2,1,3, isrs: 1,2,3{code} > We wait some time
[GitHub] [kafka] jasonk000 edited a comment on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 edited a comment on pull request #11721: URL: https://github.com/apache/kafka/pull/11721#issuecomment-1028229086 Good spot on jmh 1.34, TIL! Here's a result from a re-run ``` jkoch@jkoch:~/code/kafka$ java -version openjdk version "17.0.1" 2021-10-19 OpenJDK Runtime Environment (build 17.0.1+12-Ubuntu-121.10) OpenJDK 64-Bit Server VM (build 17.0.1+12-Ubuntu-121.10, mixed mode, sharing) ``` ``` jkoch@jkoch:~/code/kafka$ cat gradle/dependencies.gradle | grep jmh jmh: "1.34", jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh", jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh", ``` With `-perfnorm`, the interesting results seem to be: - IPC, cache miss rates are similarly good across all - The math version has 2 branches, the lookup table has 3 branches, and the baseline loop has 6 branches. All well predicted. - It seems to come simply down to instruction counts: math has 31, lookup has 15, and baseline has 27. ``` Benchmark Mode CntScore Error Units ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne thrpt5 520507.594 ± 3323.803 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:IPC thrpt 4.759 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:branches thrpt 2.002 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:cycles thrpt 6.518 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:instructions thrpt31.016 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOne thrpt5 1035024.683 ± 25061.922 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:IPC thrpt 4.576 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:branches thrpt 3.000 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:cycles thrpt 3.278 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:instructions thrpt15.000 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne thrpt5 676262.808 ± 4065.238 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:IPC thrpt 5.385 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:branches thrpt 6.001 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:cycles thrpt 5.016 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:instructions thrpt27.013 #/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonk000 commented on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
jasonk000 commented on pull request #11721: URL: https://github.com/apache/kafka/pull/11721#issuecomment-1028229086 Good spot on jmh 1.34, TIL! Here's a result from a re-run ``` jkoch@jkoch:~/code/kafka$ java -version openjdk version "17.0.1" 2021-10-19 OpenJDK Runtime Environment (build 17.0.1+12-Ubuntu-121.10) OpenJDK 64-Bit Server VM (build 17.0.1+12-Ubuntu-121.10, mixed mode, sharing) ``` ``` jkoch@jkoch:~/code/kafka$ cat gradle/dependencies.gradle | grep jmh jmh: "1.34", jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh", jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh", ``` Benchmarks ``` Benchmark Mode CntScore Error Units ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne thrpt5 520507.594 ± 3323.803 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintOne thrpt5 1035024.683 ± 25061.922 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne thrpt5 676262.808 ± 4065.238 ops/ms ``` With `-perfnorm`, the interesting results seem to be: - IPC, cache miss rates are similarly good across all - The math version has 2 branches, the lookup table has 3 branches, and the baseline loop has 6 branches. All well predicted. - It seems to come simply down to instruction counts: math has 31, lookup has 15, and baseline has 27. ``` Benchmark Mode CntScore Error Units ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne thrpt5 520507.594 ± 3323.803 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:IPC thrpt 4.759 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:branches thrpt 2.002 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:cycles thrpt 6.518 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:instructions thrpt31.016 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOne thrpt5 1035024.683 ± 25061.922 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:IPC thrpt 4.576 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:branches thrpt 3.000 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:cycles thrpt 3.278 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:instructions thrpt15.000 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne thrpt5 676262.808 ± 4065.238 ops/ms ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:IPC thrpt 5.385 insns/clk ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:branches thrpt 6.001 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:cycles thrpt 5.016 #/op ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:instructions thrpt27.013 #/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset
[ https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486005#comment-17486005 ] Sergey Ivanov commented on KAFKA-13639: --- We found mail thread which describe the similar issue: [https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] and the same in [https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume] when __consumer_offsets topic's partition was corrupted due to shutdown Kafka cluster (with working clients). They said that issue was resolved when Kafka compacts topics segments (with our retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this process and set the following properties for the topic: {code:java} ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic __consumer_offsets --config retention.ms=1 {code} Then we restarted Kafka cluster, and {*}the issue gone{*}, partition 36 has been replicated. We faced this issue at lease one more time and it doesn't go without our manually retention changes. So looks like this is not random bug. Can anyone help with investigation? We can provide more logs if necessary. > NotEnoughReplicasException for __consumer_offsets topic due to out of order > offset > -- > > Key: KAFKA-13639 > URL: https://issues.apache.org/jira/browse/KAFKA-13639 > Project: Kafka > Issue Type: Bug > Components: core, log >Affects Versions: 2.6.2 >Reporter: Sergey Ivanov >Priority: Major > > Hello, > We faced a strange issue with Kafka during testing failover scenarios: this > assumes forces shutdown nodes where Kafka pods are placed (Kafka is deployed > to Kubernetes), and then return these nodes. > After this Kafka pods are started normally but +some+ consumers could not > connect to it with errors: > > {code:java} > [2022-01-27T14:35:09.051][level=DEBUG][class=kafka_client:utils.go:120]: > Failed to sync group mae_processor: [15] Group Coordinator Not Available: the > broker returns this error code for group coordinator requests, offset > commits, and most group management requests if the offsets topic has not yet > been created, or if the group coordinator is not active{code} > > > It looked like there were issues with ___consumer_offsets_ topic. In logs of > brokers we found this error: > {code:java} > [2022-01-27T14:56:00,233][INFO][category=kafka.coordinator.group.GroupCoordinator] > [GroupCoordinator 1]: Group mae_processor with generation 329 is now empty > (__consumer_offsets-36) > [2022-01-27T14:56:00,233][ERROR][category=kafka.server.ReplicaManager] > [ReplicaManager broker=1] Error processing append operation on partition > __consumer_offsets-36 > org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(1) is insufficient to satisfy the min.isr requirement of 2 > for partition __consumer_offsets-36 > [2022-01-27T14:56:00,233][WARN][category=kafka.coordinator.group.GroupCoordinator] > [GroupCoordinator 1]: Failed to write empty metadata for group > mae_processor: The coordinator is not available. > {code} > If we check partitions of __consumer_offsets it really has one partition with > insufficient ISR: > {code:java} > topic "__consumer_offsets" with 50 partitions: > partition 0, leader 1, replicas: 1,3,2, isrs: 1,2,3 > ... > partition 35, leader 3, replicas: 3,1,2, isrs: 1,2,3 > partition 36, leader 1, replicas: 1,3,2, isrs: 1 > partition 37, leader 2, replicas: 2,1,3, isrs: 1,2,3 > > partition 49, leader 2, replicas: 2,1,3, isrs: 1,2,3{code} > We wait some time but the issue didn't go, we still had one partition with > insufficient ISR. > First of all we [thought > |https://stackoverflow.com/questions/51491152/fixing-under-replicated-partitions-in-kafka/53540963#53540963]this > is issue with Kafka-ZooKeeper coordinations, so we restarted ZooKeeper > cluster and brokers 2 and 3, which didn't have ISR. +But it didn't help.+ > We also tried to manually ellect leader for this partition with > kafka-leader-election.sh (in hope it will help). +But it didn't help too.+ > In logs we also found an issue: > {code:java} > [2022-01-27T16:17:29,531][ERROR][category=kafka.server.ReplicaFetcherThread] > [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Unexpected error > occurred while processing data for partition __consumer_offsets-36 at offset > 19536 > kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append > to __consumer_offsets-36: List(19536, 19536, 19537, 19538, 19539, 19540, > 19541, 19542, 19543, 19544, 19545, 19546, 19547, 19548, 19549, 19550, 19551, > 19552, 19553, 19554, 19555, 19556, 19557, 19558, 19559, 19560, 19561) > at
[jira] [Commented] (KAFKA-13633) Merging multiple KStreams in one operation
[ https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486002#comment-17486002 ] Matthias J. Sax commented on KAFKA-13633: - Thanks! Will try to review the KIP soon. > Merging multiple KStreams in one operation > -- > > Key: KAFKA-13633 > URL: https://issues.apache.org/jira/browse/KAFKA-13633 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.1.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Major > Labels: kip > > The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} > with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 > {{{}KStream{}}}s together. Currently, the best way to do this is using Java's > {{{}Stream.reduce{}}}: > {noformat} > List> streams ...; > streams.stream().reduce((left, right) -> left.merge(right));{noformat} > This creates a {{merge}} node in the process graph for every {{KStream}} in > the collection being merged. > Complex process graphs can make understanding an application and debugging > more difficult, therefore, we propose a new API that creates a single > {{merge}} node in the process graph, irrespective of the number of > {{{}KStream{}}}s being merged: > {noformat} > KStream merge(Collection> streams); > KStream merge(Collection streams, Named named);{noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13633) Merging multiple KStreams in one operation
[ https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13633: Labels: kip (was: ) > Merging multiple KStreams in one operation > -- > > Key: KAFKA-13633 > URL: https://issues.apache.org/jira/browse/KAFKA-13633 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.1.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Major > Labels: kip > > The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} > with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 > {{{}KStream{}}}s together. Currently, the best way to do this is using Java's > {{{}Stream.reduce{}}}: > {noformat} > List> streams ...; > streams.stream().reduce((left, right) -> left.merge(right));{noformat} > This creates a {{merge}} node in the process graph for every {{KStream}} in > the collection being merged. > Complex process graphs can make understanding an application and debugging > more difficult, therefore, we propose a new API that creates a single > {{merge}} node in the process graph, irrespective of the number of > {{{}KStream{}}}s being merged: > {noformat} > KStream merge(Collection> streams); > KStream merge(Collection streams, Named named);{noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] philipnee commented on a change in pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841
philipnee commented on a change in pull request #11689: URL: https://github.com/apache/kafka/pull/11689#discussion_r797884312 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -1506,6 +1508,46 @@ public void testNullTopicName() { "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8))); } +@Test +public void testCallbackHandlesError() throws Exception { +Map configs = new HashMap<>(); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); +configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); + +Time time = new MockTime(); +ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE); +MockClient client = new MockClient(time, producerMetadata); + +String invalidTopicName = "topic abc"; // Invalid topic name due to space + +try (Producer producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(), +producerMetadata, client, null, time)) { +ProducerRecord record = new ProducerRecord<>(invalidTopicName, "HelloKafka"); + +// Here's the important piece of the test. Let's make sure that the RecordMetadata we get +// is non-null and adheres to the onCompletion contract. +Callback callBack = (recordMetadata, exception) -> { +assertNotNull(exception); +assertNotNull(recordMetadata); + +assertNotNull(recordMetadata.topic(), "Topic name should be valid even on send failure"); Review comment: Done - see line 1545 in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset
Sergey Ivanov created KAFKA-13639: - Summary: NotEnoughReplicasException for __consumer_offsets topic due to out of order offset Key: KAFKA-13639 URL: https://issues.apache.org/jira/browse/KAFKA-13639 Project: Kafka Issue Type: Bug Components: core, log Affects Versions: 2.6.2 Reporter: Sergey Ivanov Hello, We faced a strange issue with Kafka during testing failover scenarios: this assumes forces shutdown nodes where Kafka pods are placed (Kafka is deployed to Kubernetes), and then return these nodes. After this Kafka pods are started normally but +some+ consumers could not connect to it with errors: {code:java} [2022-01-27T14:35:09.051][level=DEBUG][class=kafka_client:utils.go:120]: Failed to sync group mae_processor: [15] Group Coordinator Not Available: the broker returns this error code for group coordinator requests, offset commits, and most group management requests if the offsets topic has not yet been created, or if the group coordinator is not active{code} It looked like there were issues with ___consumer_offsets_ topic. In logs of brokers we found this error: {code:java} [2022-01-27T14:56:00,233][INFO][category=kafka.coordinator.group.GroupCoordinator] [GroupCoordinator 1]: Group mae_processor with generation 329 is now empty (__consumer_offsets-36) [2022-01-27T14:56:00,233][ERROR][category=kafka.server.ReplicaManager] [ReplicaManager broker=1] Error processing append operation on partition __consumer_offsets-36 org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the current ISR Set(1) is insufficient to satisfy the min.isr requirement of 2 for partition __consumer_offsets-36 [2022-01-27T14:56:00,233][WARN][category=kafka.coordinator.group.GroupCoordinator] [GroupCoordinator 1]: Failed to write empty metadata for group mae_processor: The coordinator is not available. {code} If we check partitions of __consumer_offsets it really has one partition with insufficient ISR: {code:java} topic "__consumer_offsets" with 50 partitions: partition 0, leader 1, replicas: 1,3,2, isrs: 1,2,3 ... partition 35, leader 3, replicas: 3,1,2, isrs: 1,2,3 partition 36, leader 1, replicas: 1,3,2, isrs: 1 partition 37, leader 2, replicas: 2,1,3, isrs: 1,2,3 partition 49, leader 2, replicas: 2,1,3, isrs: 1,2,3{code} We wait some time but the issue didn't go, we still had one partition with insufficient ISR. First of all we [thought |https://stackoverflow.com/questions/51491152/fixing-under-replicated-partitions-in-kafka/53540963#53540963]this is issue with Kafka-ZooKeeper coordinations, so we restarted ZooKeeper cluster and brokers 2 and 3, which didn't have ISR. +But it didn't help.+ We also tried to manually ellect leader for this partition with kafka-leader-election.sh (in hope it will help). +But it didn't help too.+ In logs we also found an issue: {code:java} [2022-01-27T16:17:29,531][ERROR][category=kafka.server.ReplicaFetcherThread] [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-36 at offset 19536 kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append to __consumer_offsets-36: List(19536, 19536, 19537, 19538, 19539, 19540, 19541, 19542, 19543, 19544, 19545, 19546, 19547, 19548, 19549, 19550, 19551, 19552, 19553, 19554, 19555, 19556, 19557, 19558, 19559, 19560, 19561) at kafka.log.Log.$anonfun$append$2(Log.scala:1126) at kafka.log.Log.append(Log.scala:2349) at kafka.log.Log.appendAsFollower(Log.scala:1036) at [2022-01-27T16:17:29,531][WARN][category=kafka.server.ReplicaFetcherThread] [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Partition __consumer_offsets-36 marked as failed {code} This looks like root cause, right? Can force shutdown Kafka process lead to this issue? Looks like a bug, moreover, shall Kafka handle case of corrupting data (if it's the root cause of issue above)? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486001#comment-17486001 ] Matthias J. Sax commented on KAFKA-13638: - We did update the RocksDB version in 3.0.0 release: https://issues.apache.org/jira/browse/KAFKA-8897 Could you repeat the test with in-memory store or try to switch back to previous RocksDB version in 3.0/3.1 release and retest? Btw: it seems we are dumping RocksDB for 3.2.0 again: https://issues.apache.org/jira/browse/KAFKA-13599 – so you could also try `trunk` or 3.0/3.1 with the new RocksDB version to see if it changes the performance. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] cmccabe commented on a change in pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841
cmccabe commented on a change in pull request #11689: URL: https://github.com/apache/kafka/pull/11689#discussion_r797873026 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -1506,6 +1508,46 @@ public void testNullTopicName() { "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8))); } +@Test +public void testCallbackHandlesError() throws Exception { +Map configs = new HashMap<>(); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); +configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); + +Time time = new MockTime(); +ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE); +MockClient client = new MockClient(time, producerMetadata); + +String invalidTopicName = "topic abc"; // Invalid topic name due to space + +try (Producer producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(), +producerMetadata, client, null, time)) { +ProducerRecord record = new ProducerRecord<>(invalidTopicName, "HelloKafka"); + +// Here's the important piece of the test. Let's make sure that the RecordMetadata we get +// is non-null and adheres to the onCompletion contract. +Callback callBack = (recordMetadata, exception) -> { +assertNotNull(exception); +assertNotNull(recordMetadata); + +assertNotNull(recordMetadata.topic(), "Topic name should be valid even on send failure"); Review comment: can you please also check that the partition id gets set to -1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
vamossagar12 commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r797848030 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -329,7 +330,7 @@ private TopologyTestDriver(final InternalTopologyBuilder builder, final ThreadCache cache = new ThreadCache( logContext, -Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)), +Math.max(0, streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)), Review comment: Oh here, I thought since it's a test case so it shouldn't really matter. Isn't that the case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
vamossagar12 commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r797845874 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ## @@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); } else { -maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); +maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) +? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1; } -if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { -cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); -log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); +if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) || +isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { + +if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { +cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); +log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}", +topologyName, +STATESTORE_CACHE_MAX_BYTES_CONFIG, +CACHE_MAX_BYTES_BUFFERING_CONFIG, +STATESTORE_CACHE_MAX_BYTES_CONFIG, +cacheSize); +} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { +cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); +log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); +} else { +cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); +log.info("Topology {} is overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize); +} } else { -cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); +cacheSize = globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); Review comment: @ableegoldman , I have added that check. The line above is in the else block when neither of the 2 configs are set. Here's the complete block of code => ``` if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) || isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize); } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); } else { cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize); } } else { cacheSize = globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); } ``` Am I missing something here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa edited a comment on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028142767 Hi @dongjinleekr ,I could able to build latest patch and also need one input from you. Is All dependencies of log4j 1.x is completely Removed in this Patch?, I could see,still dependency on log4j_1.2.17 in build.gradle and dependency.gradle.Also there are dependency on log4j.properties and tools-log4j.properties instead of log4j2.properties and tools-log4j2.properties in some of the files.Is it still require or we can remove those dependencies as well.?. The things I tried from my end is as follows, 1. I tried updating build.gradle and dependency.gradle by removing the dependency of log4j. 2. Also,i tried updating some of the files,where you have added echo statement to update log4j.properties into log4j2.properties in those places where u have mentioned in that patch file. 3. After that,i compiled the code and extracted folder under "C:\kafka_2.8.1\core\build\distributions\kafka_2.13-2.8.1\kafka_2.13-2.8.1" and named it as kafka.zip file and using in our component by installing and run it as kafka Service. 4.But when i tried running kafka,iam getting following exception. 2022-02-02 05:57:17.158 [INF] [Kafka] Connecting to localhost:2181 2022-02-02 05:57:27.571 [INF] [Kafka] WATCHER:: 2022-02-02 05:57:27.571 [INF] [Kafka] WatchedEvent state:SyncConnected type:None path:null 2022-02-02 05:57:27.574 [INF] [Kafka] [] 2022-02-02 05:58:17.227 [ERR] [Kafka] ERROR StatusLogger Reconfiguration failed: No configuration found for '764c12b6' at 'null' in 'null' 2022-02-02 05:58:17.684 [INF] [Kafka] DEPRECATED: using log4j 1.x configuration. To use log4j 2.x configuration, run with: 'set KAFKA_LOG4J_OPTS=-Dlog4j.configurationFile=file:C:\kafka/config/tools-log4j2.properties' To brief about my requirement is , Currently the kafka package we using,contains some of the patches which we have added on top of kafka_2.8.1 source code.In which one the custom change we have made is,we are using apache-log4j-extras 1.2.17 with timebased triggering policy for rolling log files as it is not available in log4j.1.2.17. Since this version has vulnerability ,we wanted to use that log4j2 api for this rolling policy logic which is working in your patch. Can you please help me on this...? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa edited a comment on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028142767 Hi @dongjinleekr ,I could able to build latest patch and also need one input from you. Is All dependencies of log4j 1.x is completely Removed in this Patch?, I could see,still dependency on log4j_1.2.17 in build.gradle and dependency.gradle.Also there are dependency on log4j.properties and tools-log4j.properties instead of log4j2.properties and tools-log4j2.properties in some of the files.Is it still require or we can remove those dependencies as well.? Can you please help me on this...? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028142767 Hi @dongjinleekr , need one input from you. Is All dependencies of log4j 1.x is completely Removed in this Patch?, I could see,still dependency on log4j_1.2.17 in build.gradle and dependency.gradle.Also there are dependency on log4j.properties and tools-log4j.properties instead of log4j2.properties and tools-log4j2.properties in some of the files.Is it still require or we can remove those dependencies as well.? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list
wcarlson5 commented on a change in pull request #11712: URL: https://github.com/apache/kafka/pull/11712#discussion_r797772309 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -270,6 +278,23 @@ Task task(final TaskId taskId) { return readOnlyActiveTasks; } +List orderedActiveTasks() { +return Collections.unmodifiableList(orderedActiveTasks); +} + +void moveActiveTasksToTailFor(final String topologyName) { +final List tasksToMove = new LinkedList<>(); +final Iterator iterator = orderedActiveTasks.iterator(); +while (iterator.hasNext()) { +final Task task = iterator.next(); +if (task.id().topologyName().equals(topologyName)) { +iterator.remove(); +tasksToMove.add(task); +} +} +orderedActiveTasks.addAll(tasksToMove); Review comment: It would be much simpler but unfortunately its not as simple as we first thought. The producer has only one transaction, so the records of the good tasks are mixed in with the records of the failed task and there is no way to separate them. So we need to take the tasks that we know will fail and process all the other tasks without them. That way we continue making progress. We can take the failing tasks and backoff and retry again later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485878#comment-17485878 ] Fabrice Bauzac-Stehly edited comment on KAFKA-9366 at 2/2/22, 3:17 PM: --- https://kafka.apache.org/cve-list claims to list all security vulnerabilities that are fixed in released versions of Apache Kafka. I understand this as "all other vulnerabilities are potentially exploitable unless explicitly told otherwise". Here is the list as of 2022-02-02 Wednesday: - CVE-2017-12610 Authenticated Kafka clients may impersonate other users - CVE-2018-1288 Authenticated Kafka clients may interfere with data replication - CVE-2018-17196 Authenticated clients with Write permission may bypass transaction/idempotent ACL validation - CVE-2019-12399 Apache Kafka Connect REST API may expose plaintext secrets in tasks endpoint - CVE-2021-4104 Flaw in Apache Log4j logging library in versions 1.x - CVE-2021-38153 Timing Attack Vulnerability for Apache Kafka Connect and Clients - CVE-2021-44228 Flaw in Apache Log4j logging library in versions from 2.0.0 and before 2.15.0 - CVE-2021-45046 Flaw in Apache Log4j logging library in versions from 2.0-beta9 through 2.12.1 and from 2.13.0 through 2.15.0 - CVE-2022-23307 Deserialization of Untrusted Data Flaw in Apache Log4j logging library in versions 1.x https://logging.apache.org/log4j/1.2/ lists several vulnerabilities that affect log4j 1.x. As of 2022-02-02 Wednesday: - CVE-2019-17571 is a high severity issue targeting the SocketServer. Log4j includes a SocketServer that accepts serialized log events and deserializes them without verifying whether the objects are allowed or not. This can provide an attack vector that can be expoited. => NOT FIXED IN KAFKA? - CVE-2020-9488 is a moderate severity issue with the SMTPAppender. Improper validation of certificate with host mismatch in Apache Log4j SMTP appender. This could allow an SMTPS connection to be intercepted by a man-in-the-middle attack which could leak any log messages sent through that appender. => NOT FIXED IN KAFKA? - CVE-2021-4104 is a high severity deserialization vulnerability in JMSAppender. JMSAppender uses JNDI in an unprotected manner allowing any application using the JMSAppender to be vulnerable if it is configured to reference an untrusted site or if the site referenced can be accesseed by the attacker. For example, the attacker can cause remote code execution by manipulating the data in the LDAP store. => mitigated: one can remove JMSAppender from the log4j-1.2.17.jar artifact. - CVE-2022-23302 is a high severity deserialization vulnerability in JMSSink. JMSSink uses JNDI in an unprotected manner allowing any application using the JMSSink to be vulnerable if it is configured to reference an untrusted site or if the site referenced can be accesseed by the attacker. For example, the attacker can cause remote code execution by manipulating the data in the LDAP store. => NOT FIXED IN KAFKA? - CVE-2022-23305 is a high serverity SQL injection flaw in JDBCAppender that allows the data being logged to modify the behavior of the component. By design, the JDBCAppender in Log4j 1.2.x accepts an SQL statement as a configuration parameter where the values to be inserted are converters from PatternLayout. The message converter, %m, is likely to always be included. This allows attackers to manipulate the SQL by entering crafted strings into input fields or headers of an application that are logged allowing unintended SQL queries to be executed. => NOT FIXED IN KAFKA? - CVE-2022-23307 is a critical severity against the chainsaw component in Log4j 1.x. This is the same issue corrected in CVE-2020-9493 fixed in Chainsaw 2.1.0 but Chainsaw was included as part of Log4j 1.2.x. => mitigated: one can remove Chainsaw from the log4j-1.2.17.jar artifact. >From all that, it looks like there is a number of still-open vulnerabilities in kafka induced by the use of log4j? Can somebody confirm? was (Author: noonbs): https://kafka.apache.org/cve-list claims to list all security vulnerabilities that are fixed in released versions of Apache Kafka. I understand this as "all other vulnerabilities are potentially exploitable unless explicitly told otherwise". Here is the list as of 2022-02-02 Wednesday: - CVE-2017-12610 Authenticated Kafka clients may impersonate other users - CVE-2018-1288 Authenticated Kafka clients may interfere with data replication - CVE-2018-17196 Authenticated clients with Write permission may bypass transaction/idempotent ACL validation - CVE-2019-12399 Apache Kafka Connect REST API may expose plaintext secrets in tasks endpoint - CVE-2021-4104 Flaw in Apache Log4j logging library in versions 1.x - CVE-2021-38153 Timing Attack Vulnerability for Apache Kafka Connect and
[jira] [Commented] (KAFKA-13626) NullPointerException in Selector.pollSelectionKeys: channel is null
[ https://issues.apache.org/jira/browse/KAFKA-13626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485879#comment-17485879 ] Kvicii.Yu commented on KAFKA-13626: --- [~haeuserd] Thanks for your report, but what exactly do you mean by network issue, I don't think this should be counted as an issue. > NullPointerException in Selector.pollSelectionKeys: channel is null > --- > > Key: KAFKA-13626 > URL: https://issues.apache.org/jira/browse/KAFKA-13626 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.7.1 >Reporter: Daniel Häuser >Priority: Minor > > This NullPointerException occured while we were having networking issues. > Unfortunately I cannot provide much more information than this stack trace > because this is all I got from our operations team. > {code:java} > java.lang.IllegalStateException: This error handler cannot process > 'java.lang.NullPointerException's; no record information is available > at > org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) > at > org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1599) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:831) > Caused by: java.lang.NullPointerException: Cannot invoke > "org.apache.kafka.common.network.KafkaChannel.id()" because "channel" is null > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:516) > at org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at jdk.internal.reflect.GeneratedMethodAccessor128.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > at > org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) > at > org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208) > at jdk.proxy2/jdk.proxy2.$Proxy137.poll(Unknown Source) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) > ... 3 common frames omitted {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485878#comment-17485878 ] Fabrice Bauzac-Stehly commented on KAFKA-9366: -- https://kafka.apache.org/cve-list claims to list all security vulnerabilities that are fixed in released versions of Apache Kafka. I understand this as "all other vulnerabilities are potentially exploitable unless explicitly told otherwise". Here is the list as of 2022-02-02 Wednesday: - CVE-2017-12610 Authenticated Kafka clients may impersonate other users - CVE-2018-1288 Authenticated Kafka clients may interfere with data replication - CVE-2018-17196 Authenticated clients with Write permission may bypass transaction/idempotent ACL validation - CVE-2019-12399 Apache Kafka Connect REST API may expose plaintext secrets in tasks endpoint - CVE-2021-4104 Flaw in Apache Log4j logging library in versions 1.x - CVE-2021-38153 Timing Attack Vulnerability for Apache Kafka Connect and Clients - CVE-2021-44228 Flaw in Apache Log4j logging library in versions from 2.0.0 and before 2.15.0 - CVE-2021-45046 Flaw in Apache Log4j logging library in versions from 2.0-beta9 through 2.12.1 and from 2.13.0 through 2.15.0 - CVE-2022-23307 Deserialization of Untrusted Data Flaw in Apache Log4j logging library in versions 1.x https://logging.apache.org/log4j/1.2/ lists several vulnerabilities that affect log4j 1.x. As of 2022-02-02 Wednesday: - CVE-2019-17571 is a high severity issue targeting the SocketServer. Log4j includes a SocketServer that accepts serialized log events and deserializes them without verifying whether the objects are allowed or not. This can provide an attack vector that can be expoited. => NOT FIXED IN KAFKA? - CVE-2020-9488 is a moderate severity issue with the SMTPAppender. Improper validation of certificate with host mismatch in Apache Log4j SMTP appender. This could allow an SMTPS connection to be intercepted by a man-in-the-middle attack which could leak any log messages sent through that appender. => NOT FIXED IN KAFKA? - CVE-2021-4104 is a high severity deserialization vulnerability in JMSAppender. JMSAppender uses JNDI in an unprotected manner allowing any application using the JMSAppender to be vulnerable if it is configured to reference an untrusted site or if the site referenced can be accesseed by the attacker. For example, the attacker can cause remote code execution by manipulating the data in the LDAP store. => mitigated: one can remove JMSAppender from the log4j-1.2.17.jar artifact. - CVE-2022-23302 is a high severity deserialization vulnerability in JMSSink. JMSSink uses JNDI in an unprotected manner allowing any application using the JMSSink to be vulnerable if it is configured to reference an untrusted site or if the site referenced can be accesseed by the attacker. For example, the attacker can cause remote code execution by manipulating the data in the LDAP store. => NOT FIXED IN KAFKA? - CVE-2022-23305 is a high serverity SQL injection flaw in JDBCAppender that allows the data being logged to modify the behavior of the component. By design, the JDBCAppender in Log4j 1.2.x accepts an SQL statement as a configuration parameter where the values to be inserted are converters from PatternLayout. The message converter, %m, is likely to always be included. This allows attackers to manipulate the SQL by entering crafted strings into input fields or headers of an application that are logged allowing unintended SQL queries to be executed. => NOT FIXED IN KAFKA? - CVE-2022-23307 is a critical severity against the chainsaw component in Log4j 1.x. This is the same issue corrected in CVE-2020-9493 fixed in Chainsaw 2.1.0 but Chainsaw was included as part of Log4j 1.2.x. => mitigated: one can remove Chainsaw from the log4j-1.2.17.jar artifact. >From all that, it looks like there is a number of still-open vulnerabilities in kafka induced by the use of log4j? Can somebody confirm? > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.2.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > >
[jira] [Commented] (KAFKA-13534) Upgrade Log4j to 2.15.0 - CVE-2021-44228
[ https://issues.apache.org/jira/browse/KAFKA-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485868#comment-17485868 ] Fabrice Bauzac-Stehly commented on KAFKA-13534: --- Shouldn't this ticket be closed as duplicate of KAFKA-9366? > Upgrade Log4j to 2.15.0 - CVE-2021-44228 > > > Key: KAFKA-13534 > URL: https://issues.apache.org/jira/browse/KAFKA-13534 > Project: Kafka > Issue Type: Task >Affects Versions: 2.7.0, 2.8.0, 3.0.0 >Reporter: Sai Kiran Vudutala >Priority: Major > > Log4j has an RCE vulnerability, see > [https://www.lunasec.io/docs/blog/log4j-zero-day/] > References. > [https://github.com/advisories/GHSA-jfh8-c2jp-5v3q] > [https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ulrik updated KAFKA-13638: -- Description: I have a topology where I stream messages from an input topic, transform the message to multiple messages (via context.forward), and then store those messages in a KTable. Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests take significantly longer time to run. I have attached a test class to demonstrate my scenario. When running this test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following numbers: *Version 2.8.1* * one input message and one output message: 541 ms * 8 input message and 30 output message per input message (240 output messages in total): 919 ms *Version 3.1.0* * one input message and one output message: 908 ms * 8 input message and 30 output message per input message (240 output messages in total): 6 sec 94 ms Even when the transformer just transforms and forwards one input message to one output message, the test takes approx. 400 ms longer to run. When transforming 8 input messages to 240 output messages it takes approx 5 seconds longer. was: I have a topology where I stream messages from an input topic, transform the message to multiple messages (via context.forward), and then store those messages in a KTable. Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests takes significantly longer time to run. I have attached a test class to demonstrate my scenario. When running this test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following numbers: *Version 2.8.1* * one input message and one output message: 541 ms * 8 input message and 30 output message per input message (240 output messages in total): 919 ms *Version 3.1.0* * one input message and one output message: 908 ms * 8 input message and 30 output message per input message (240 output messages in total): 6 sec 94 ms Even when the transformer just transforms and forwards one input message to one output message, the test takes approx. 400 ms longer to run. When transforming 8 input messages to 240 output messages it takes approx 5 seconds longer. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests take significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ijuma commented on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm
ijuma commented on pull request #11721: URL: https://github.com/apache/kafka/pull/11721#issuecomment-1027997296 Thanks for the detailed analysis! Note that JMH 1.34 (https://mail.openjdk.java.net/pipermail/jmh-dev/2021-December/003406.html) has support for much cheaper blackholes if executed with Java 17 or later. On that note, which Java version are you using in your experiments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jonathan-albrecht-ibm commented on pull request #11690: KAFKA-13599: Upgrade RocksDB to 6.27.3
jonathan-albrecht-ibm commented on pull request #11690: URL: https://github.com/apache/kafka/pull/11690#issuecomment-1027990865 Thanks @cadonna! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11618: KAFKA-13558: NioEchoServer fails to close resources
ijuma merged pull request #11618: URL: https://github.com/apache/kafka/pull/11618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11619: MINOR: allocate 2MB to offset map in connect EmbeddedKafkaCluster
ijuma merged pull request #11619: URL: https://github.com/apache/kafka/pull/11619 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11518: MINOR: Upgrade to Gradle 7.3.3
jlprat commented on pull request #11518: URL: https://github.com/apache/kafka/pull/11518#issuecomment-1027974346 Thanks @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11669: MINOR: Replace if/else with match in KafkaZkClient#getPartitionAssignmentForTopics
ijuma merged pull request #11669: URL: https://github.com/apache/kafka/pull/11669 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11717: KAFKA-13619: zookeeper.sync.time.ms is no longer used
ijuma merged pull request #11717: URL: https://github.com/apache/kafka/pull/11717 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11518: MINOR: Upgrade to Gradle 7.3.3
ijuma merged pull request #11518: URL: https://github.com/apache/kafka/pull/11518 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list
cadonna commented on a change in pull request #11712: URL: https://github.com/apache/kafka/pull/11712#discussion_r797611602 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -270,6 +278,23 @@ Task task(final TaskId taskId) { return readOnlyActiveTasks; } +List orderedActiveTasks() { +return Collections.unmodifiableList(orderedActiveTasks); +} + +void moveActiveTasksToTailFor(final String topologyName) { +final List tasksToMove = new LinkedList<>(); +final Iterator iterator = orderedActiveTasks.iterator(); +while (iterator.hasNext()) { +final Task task = iterator.next(); +if (task.id().topologyName().equals(topologyName)) { +iterator.remove(); +tasksToMove.add(task); +} +} +orderedActiveTasks.addAll(tasksToMove); Review comment: Are you proposing to commit tasks each time an exception occurs irrespectively of whether it is time to commit or not? Wouldn't it be simpler to commit when an exception happens instead of when the processing after an exception re-starts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
Ulrik created KAFKA-13638: - Summary: Slow KTable update when forwarding multiple values from transformer Key: KAFKA-13638 URL: https://issues.apache.org/jira/browse/KAFKA-13638 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.0.0, 3.1.0 Reporter: Ulrik Attachments: KafkaTest.java I have a topology where I stream messages from an input topic, transform the message to multiple messages (via context.forward), and then store those messages in a KTable. Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests takes significantly longer time to run. I have attached a test class to demonstrate my scenario. When running this test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following numbers: *Version 2.8.1* * one input message and one output message: 541 ms * 8 input message and 30 output message per input message (240 output messages in total): 919 ms *Version 3.1.0* * one input message and one output message: 908 ms * 8 input message and 30 output message per input message (240 output messages in total): 6 sec 94 ms Even when the transformer just transforms and forwards one input message to one output message, the test takes approx. 400 ms longer to run. When transforming 8 input messages to 240 output messages it takes approx 5 seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac commented on a change in pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets
dajac commented on a change in pull request #11726: URL: https://github.com/apache/kafka/pull/11726#discussion_r797572591 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2948,6 +2948,64 @@ public void testAssignorNameConflict() { () -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer())); } +@Test +public void testOffsetsForTimesTimeout() { +final KafkaConsumer consumer = consumerForCheckingTimeoutException(); +assertEquals( +"Failed to get offsets by times in 6ms", + assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage() Review comment: Could we import `TimeoutException` instead of specifying the full qualified name every time? ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -2948,6 +2948,64 @@ public void testAssignorNameConflict() { () -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer())); } +@Test +public void testOffsetsForTimesTimeout() { +final KafkaConsumer consumer = consumerForCheckingTimeoutException(); +assertEquals( +"Failed to get offsets by times in 6ms", + assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage() +); +} + +@Test +public void testBeginningOffsetsTimeout() { +final KafkaConsumer consumer = consumerForCheckingTimeoutException(); +assertEquals( +"Failed to get offsets by times in 6ms", + assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.beginningOffsets(singletonList(tp0))).getMessage() +); +} + +@Test +public void testEndOffsetsTimeout() { +final KafkaConsumer consumer = consumerForCheckingTimeoutException(); +assertEquals( +"Failed to get offsets by times in 6ms", + assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.endOffsets(singletonList(tp0))).getMessage() +); +} + +private KafkaConsumer consumerForCheckingTimeoutException() { +final Time time = new MockTime(); +SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); +ConsumerMetadata metadata = createMetadata(subscription); +MockClient client = new MockClient(time, metadata); + +initMetadata(client, singletonMap(topic, 1)); +Node node = metadata.fetch().nodes().get(0); + +ConsumerPartitionAssignor assignor = new RangeAssignor(); + +final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); + +final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); +for (int i = 0; i < 10; i++) { +// Prepare a retriable error periodically for the client to retry connection +exec.schedule( +() -> client.prepareResponseFrom( +listOffsetsResponse( +Collections.emptyMap(), +Collections.singletonMap(tp0, Errors.UNKNOWN_TOPIC_OR_PARTITION) +), +node), 50L, TimeUnit.MILLISECONDS); +// Sleep periodically to make loop retry timeout +exec.schedule(() -> time.sleep(defaultApiTimeoutMs / 10), 50L, TimeUnit.MILLISECONDS); + +} Review comment: I think that we could simplify this code and avoid using an executor by doing as follow: ``` for (int i = 0; i < 10; i++) { client.prepareResponse( request -> { time.sleep(defaultApiTimeoutMs / 10); return request instanceof ListOffsetsRequest; }, listOffsetsResponse( Collections.emptyMap(), Collections.singletonMap(tp0, Errors.UNKNOWN_TOPIC_OR_PARTITION) )); } ``` What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer
[ https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ulrik updated KAFKA-13638: -- Description: I have a topology where I stream messages from an input topic, transform the message to multiple messages (via context.forward), and then store those messages in a KTable. Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests takes significantly longer time to run. I have attached a test class to demonstrate my scenario. When running this test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following numbers: *Version 2.8.1* * one input message and one output message: 541 ms * 8 input message and 30 output message per input message (240 output messages in total): 919 ms *Version 3.1.0* * one input message and one output message: 908 ms * 8 input message and 30 output message per input message (240 output messages in total): 6 sec 94 ms Even when the transformer just transforms and forwards one input message to one output message, the test takes approx. 400 ms longer to run. When transforming 8 input messages to 240 output messages it takes approx 5 seconds longer. was: I have a topology where I stream messages from an input topic, transform the message to multiple messages (via context.forward), and then store those messages in a KTable. Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests takes significantly longer time to run. I have attached a test class to demonstrate my scenario. When running this test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following numbers: *Version 2.8.1* * one input message and one output message: 541 ms * 8 input message and 30 output message per input message (240 output messages in total): 919 ms *Version 3.1.0* * one input message and one output message: 908 ms * 8 input message and 30 output message per input message (240 output messages in total): 6 sec 94 ms Even when the transformer just transforms and forwards one input message to one output message, the test takes approx. 400 ms longer to run. When transforming 8 input messages to 240 output messages it takes approx 5 seconds longer. > Slow KTable update when forwarding multiple values from transformer > --- > > Key: KAFKA-13638 > URL: https://issues.apache.org/jira/browse/KAFKA-13638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.0.0 >Reporter: Ulrik >Priority: Major > Attachments: KafkaTest.java > > > I have a topology where I stream messages from an input topic, transform the > message to multiple messages (via context.forward), and then store those > messages in a KTable. > Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my > tests takes significantly longer time to run. > > I have attached a test class to demonstrate my scenario. When running this > test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following > numbers: > > *Version 2.8.1* > * one input message and one output message: 541 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 919 ms > > *Version 3.1.0* > * one input message and one output message: 908 ms > * 8 input message and 30 output message per input message (240 output > messages in total): 6 sec 94 ms > > Even when the transformer just transforms and forwards one input message to > one output message, the test takes approx. 400 ms longer to run. > When transforming 8 input messages to 240 output messages it takes approx 5 > seconds longer. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dengziming commented on pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets
dengziming commented on pull request #11726: URL: https://github.com/apache/kafka/pull/11726#issuecomment-1027842174 @dajac , Yes, I added a unit test for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
cadonna commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-1027802506 @vamossagar12 Can we close this PR and the corresponding ticket since it seems we decided to look for a more complete solution? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13599) Upgrade RocksDB to 6.27.3
[ https://issues.apache.org/jira/browse/KAFKA-13599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-13599. --- Resolution: Resolved > Upgrade RocksDB to 6.27.3 > - > > Key: KAFKA-13599 > URL: https://issues.apache.org/jira/browse/KAFKA-13599 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Jonathan Albrecht >Assignee: Jonathan Albrecht >Priority: Major > Attachments: compat_report.html > > > RocksDB v6.27.3 has been released and it is the first release to support > s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle > without s390x support. > RocksDB v6.27.3 has added some new options that require an update to > streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java > but no other changes are needed to upgrade. > A compatibility report is attached for the current version 6.22.1.1 -> 6.27.3 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] HyunSangHan commented on pull request #11418: MINOR: Write log differently according to the size of missingListenerPartitions
HyunSangHan commented on pull request #11418: URL: https://github.com/apache/kafka/pull/11418#issuecomment-1027773412 Can someone look at this PR? :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #11690: KAFKA-13599: Upgrade RocksDB to 6.27.3
cadonna merged pull request #11690: URL: https://github.com/apache/kafka/pull/11690 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11690: KAFKA-13599: Upgrade RocksDB to 6.27.3
cadonna commented on pull request #11690: URL: https://github.com/apache/kafka/pull/11690#issuecomment-1027763171 Failing test are unrelated: ``` Build / JDK 11 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, Security=PLAINTEXT Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsExpirationTest.[2] quorum=kraft Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testAllTopicPartition, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testAbortTransactionTimeout() Build / JDK 17 and Scala 2.13 / kafka.server.KRaftClusterTest.testCreateClusterAndCreateListDeleteTopic() Build / JDK 17 and Scala 2.13 / kafka.network.SocketServerTest.remoteCloseWithIncompleteBufferedReceive() Build / JDK 8 and Scala 2.12 / kafka.network.SocketServerTest.remoteCloseWithIncompleteBufferedReceive() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
Indupa commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1027741356 Thank you so much @dongjinleekr . Let me try to apply a patch and build.Will update you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
dongjinleekr commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1027702595 @Indupa I'm sorry. There was a mistake rebasing onto 2.8.1. You can see the updated patch with built tarball [here](https://github.com/dongjinleekr/kafka/releases/tag/2.8.1%2Blog4j2). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets
dajac commented on pull request #11726: URL: https://github.com/apache/kafka/pull/11726#issuecomment-1027677422 Good catch! It seems that we forgot it in https://github.com/apache/kafka/commit/53ca52f855e903907378188d29224b3f9cefa6cb. Have you tried to add a unit test for the bug? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org