[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-02 Thread Ulrik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486257#comment-17486257
 ] 

Ulrik commented on KAFKA-13638:
---

I tried the following combination of versions:


 * streams 2.8.1 & rocksdb 6.0.1 : fast
 * streams 2.8.1 & rocksdb 6.1.1 : fast
 * streams 2.8.1 & rocksdb 6.2.2 : fast
 * streams 2.8.1 & rocksdb 6.3.6 : fast
 * streams 2.8.1 & rocksdb 6.4.6 : fast
 * streams 2.8.1 & rocksdb 6.5.2 : fast
 * streams 2.8.1 & rocksdb 6.6.4 : fast
 * streams 2.8.1 & rocksdb 6.7.3 : fast
 * streams 2.8.1 & rocksdb 6.11.6 : fast
 * streams 2.8.1 & rocksdb 6.12.7 : fast
 * streams 2.8.1 & rocksdb >= 6.13.3 : Runtime error:

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
table at location 
/var/folders/_4/ks5l_9vj6zbfbpf5w6180t3j8kb4pd/T/kafka-streams/dummy-a36d66ed-ba41-4e6d-884f-3c2a652d245d/0_0/rocksdb/table
    at 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87)
    at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:186)
    at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:254)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:55)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:75)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:122)
    at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:122)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201)
    at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
    at 
org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:537)
    at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:385)
    at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:313)
    at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:272)
    at kafkatest.KafkaTest.validateTopologyCanProcessData(KafkaTest.java:79)
    at kafkatest.KafkaTest.multipleForwardsFromTransformer(KafkaTest.java:69)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at 

[jira] [Created] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters

2022-02-02 Thread Mohammad Yousuf Minhaj Zia (Jira)
Mohammad Yousuf Minhaj Zia created KAFKA-13641:
--

 Summary: Kafka Streams Scala: Add `Option` to `ValueJoiner` 
parameters
 Key: KAFKA-13641
 URL: https://issues.apache.org/jira/browse/KAFKA-13641
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Mohammad Yousuf Minhaj Zia


Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be 
nullable, I am wondering if can wrap them around Scala `Option`.

However, there is also the concern that the left hand side value can be null in 
the case of tombstone messages, in which the `Option` semantics can be 
misleading. I still feel this could be a useful feature in reducing the number 
of `NullPointerExceptions`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-02 Thread GitBox


Indupa edited a comment on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028142767


   Hi @dongjinleekr ,I could able to build latest patch and also need one input 
from you.
   
   Is All dependencies of log4j 1.x is completely Removed in this 
Patch?, I could see,still dependency on log4j_1.2.17 in 
build.gradle and dependency.gradle.Also there are dependency on 
log4j.properties and tools-log4j.properties instead of log4j2.properties and 
tools-log4j2.properties in some of the files.Is it still require or we can 
remove those dependencies as well.?.
   
   The things I tried from my end is as follows,
   
   1. I tried updating build.gradle and dependency.gradle by removing the 
dependency of log4j.
   2. Also,i tried updating some of the files,where you have added echo 
statement to update log4j.properties into log4j2.properties in those places 
where u have mentioned in that patch file by removing log4j.properties and 
connect-log4j.properties and tools-log4j.properties file.
   3. After that,i compiled the code and extracted  folder under 
"C:\kafka_2.8.1\core\build\distributions\kafka_2.13-2.8.1\kafka_2.13-2.8.1" and 
named it as kafka.zip file and using in our component by installing and run it 
as kafka Service.
   4.But when i tried running kafka,iam getting following exception.
   
2022-02-02 05:57:17.158  [INF] [Kafka] Connecting to localhost:2181
   2022-02-02 05:57:27.571  [INF] [Kafka] WATCHER::
   2022-02-02 05:57:27.571  [INF] [Kafka] WatchedEvent state:SyncConnected 
type:None path:null
   2022-02-02 05:57:27.574  [INF] [Kafka] []
   2022-02-02 05:58:17.227  [ERR] [Kafka] ERROR StatusLogger Reconfiguration 
failed: No configuration found for '764c12b6' at 'null' in 'null'
   2022-02-02 05:58:17.684  [INF] [Kafka] DEPRECATED: using log4j 1.x 
configuration. To use log4j 2.x configuration, run with: 'set 
KAFKA_LOG4J_OPTS=-Dlog4j.configurationFile=file:C:\kafka/config/tools-log4j2.properties'
   
   To brief about my requirement is , Currently the kafka package we 
using,contains some of the patches which we have added on top of kafka_2.8.1 
source code.In which one the custom change we have made is,we are using 
apache-log4j-extras 1.2.17 with timebased triggering policy for rolling log 
files as it is not  available in log4j.1.2.17. Since this version has 
vulnerability ,we wanted to use that log4j2 api for this rolling policy logic 
which is working in your patch.
   
Can you please help me on this...?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets

2022-02-02 Thread GitBox


dengziming commented on a change in pull request #11726:
URL: https://github.com/apache/kafka/pull/11726#discussion_r798205426



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2948,6 +2948,64 @@ public void testAssignorNameConflict() {
 () -> new KafkaConsumer<>(configs, new StringDeserializer(), new 
StringDeserializer()));
 }
 
+@Test
+public void testOffsetsForTimesTimeout() {
+final KafkaConsumer consumer = 
consumerForCheckingTimeoutException();
+assertEquals(
+"Failed to get offsets by times in 6ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
+);
+}
+
+@Test
+public void testBeginningOffsetsTimeout() {
+final KafkaConsumer consumer = 
consumerForCheckingTimeoutException();
+assertEquals(
+"Failed to get offsets by times in 6ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.beginningOffsets(singletonList(tp0))).getMessage()
+);
+}
+
+@Test
+public void testEndOffsetsTimeout() {
+final KafkaConsumer consumer = 
consumerForCheckingTimeoutException();
+assertEquals(
+"Failed to get offsets by times in 6ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.endOffsets(singletonList(tp0))).getMessage()
+);
+}
+
+private KafkaConsumer 
consumerForCheckingTimeoutException() {
+final Time time = new MockTime();
+SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+ConsumerMetadata metadata = createMetadata(subscription);
+MockClient client = new MockClient(time, metadata);
+
+initMetadata(client, singletonMap(topic, 1));
+Node node = metadata.fetch().nodes().get(0);
+
+ConsumerPartitionAssignor assignor = new RangeAssignor();
+
+final KafkaConsumer consumer = newConsumer(time, 
client, subscription, metadata, assignor, false, groupInstanceId);
+
+final ScheduledExecutorService exec = 
Executors.newSingleThreadScheduledExecutor();
+for (int i = 0; i < 10; i++) {
+// Prepare a retriable error periodically for the client to retry 
connection
+exec.schedule(
+() -> client.prepareResponseFrom(
+listOffsetsResponse(
+Collections.emptyMap(),
+Collections.singletonMap(tp0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+),
+node), 50L, TimeUnit.MILLISECONDS);
+// Sleep periodically to make loop retry timeout
+exec.schedule(() -> time.sleep(defaultApiTimeoutMs / 10), 50L, 
TimeUnit.MILLISECONDS);
+
+}

Review comment:
   Yes, this is a good idea, and this make the test more deterministic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets

2022-02-02 Thread GitBox


dengziming commented on a change in pull request #11726:
URL: https://github.com/apache/kafka/pull/11726#discussion_r798204734



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2948,6 +2948,64 @@ public void testAssignorNameConflict() {
 () -> new KafkaConsumer<>(configs, new StringDeserializer(), new 
StringDeserializer()));
 }
 
+@Test
+public void testOffsetsForTimesTimeout() {
+final KafkaConsumer consumer = 
consumerForCheckingTimeoutException();
+assertEquals(
+"Failed to get offsets by times in 6ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()

Review comment:
   we already imported java.util.concurrent.TimeoutException so can't 
import org.apache.kafka.common.errors.TimeoutException here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent commented on pull request #11730: Use ducktape version 0.7.17

2022-02-02 Thread GitBox


stan-confluent commented on pull request #11730:
URL: https://github.com/apache/kafka/pull/11730#issuecomment-102874


   @cmccabe @ijuma @ewencp can you folks please take a look and/or suggest who 
else should be on the approvers list?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stan-confluent opened a new pull request #11730: Use ducktape version 0.7.17

2022-02-02 Thread GitBox


stan-confluent opened a new pull request #11730:
URL: https://github.com/apache/kafka/pull/11730


   Ducktape 0.7.17 locked a couple of dependencies to their last py2 compatible 
versions - this ensures more or less stable python 2 builds for kafkatest 
(until some other package breaks it).
   Since 2.7 and above uses ducktape 0.8.x and python 3, this change does not 
apply there.
   
   See these PRs for details on ducktape:
   https://github.com/confluentinc/ducktape/pull/291
   https://github.com/confluentinc/ducktape/pull/292
   
   Tested:
   - Ran `python setup.py develop` in a clean python 2.7.18 virtualenv to 
ensure all dependencies are installed without conflicts
   - Ran a small system test with `ducker-ak` just to ensure it still works ok
   - had to hack the dockerfile since openjdk:8 image does not include py2 
anymore it seems? Hacked something together to have some test run (it failed 
due to some java deps it seems, but ducktape part worked ok)
   - Ran a system test job on jenkins - 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4776 
   - used a custom test that simply brings up zk and kafka
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-02 Thread GitBox


mjsax commented on pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#issuecomment-1028497889


   I did not dig into the details myself. Anyway, might be better to discuss on 
the new PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486177#comment-17486177
 ] 

Matthias J. Sax commented on KAFKA-13638:
-

Seems this does not explain much yet, as 5.18.4 did not work...

Can you try using a 6.x version in 2.8? If it slows down, it would indicate 
that the RocksDB change is the root cause (what I suspect – cannot think of 
anything other change right now that could explain it).

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cmccabe merged pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

2022-02-02 Thread GitBox


cmccabe merged pull request #11689:
URL: https://github.com/apache/kafka/pull/11689


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

2022-02-02 Thread GitBox


cmccabe commented on pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#issuecomment-1028467009


   test failures are not related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] artemlivshits commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


artemlivshits commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r798094178



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   That does seem like a lot of instructions, some of them don't seem to be 
needed in this case where we deal with small unsigned integers (e.g. movsxd 
%r8d,%r8 is extending the sign to 64 bits, but we don't need it).  Java doesn't 
seem to support unsigned, so not sure if there is a way to hint the compiler 
that we don't need instructions that only matter for signed arithmetic.  Maybe 
using long would help at least to eliminate the need to do movsxd.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r798077165



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.utils.ByteUtils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Thread)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ByteUtilsBenchmark {
+private static final int INPUT_COUNT = 10_000;
+private static final int MAX_INT = 2 * 1024 * 1024;
+
+private final int[] sizeOfInputs = new int[INPUT_COUNT];
+
+@Setup(Level.Trial)
+public void setUp() {
+for (int i = 0; i < INPUT_COUNT; ++i) {
+sizeOfInputs[i] = ThreadLocalRandom.current().nextInt(MAX_INT);
+}
+}
+
+@Benchmark
+public long testSizeOfUnsignedVarint() {
+long result = 0;
+for (final int input : sizeOfInputs) {
+result += ByteUtils.sizeOfUnsignedVarint(input);
+}
+return result;
+}
+
+@Benchmark
+public long testSizeOfUnsignedVarintOne() {
+return ByteUtils.sizeOfUnsignedVarint(sizeOfInputs[0]);
+}
+
+@Benchmark
+public long testSizeOfUnsignedVarintMath() {
+long result = 0;
+for (final int input : sizeOfInputs) {
+ int leadingZeros = Integer.numberOfLeadingZeros(input);
+ result += (38 - leadingZeros) / 7 + leadingZeros / 32;
+}

Review comment:
   fixed in 
[d6aeeb1](https://github.com/apache/kafka/commit/d6aeeb1f034f8b75e3f608a940aa158e09497dcf)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11656: KAFKA-13579: upgrade netty/jetty/jackson to avoid vulnerability

2022-02-02 Thread GitBox


ijuma merged pull request #11656:
URL: https://github.com/apache/kafka/pull/11656


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13640) Implement final broker heartbeat in kraft

2022-02-02 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13640:


 Summary: Implement final broker heartbeat in kraft
 Key: KAFKA-13640
 URL: https://issues.apache.org/jira/browse/KAFKA-13640
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


We should implement sending a final heartbeat from the broker when we're about 
to shut down. This would speed up the process of fencing the broker.

As a note, this isn't a major concern when controlled shutdown is in use, since 
controlled shutdown should move leaders off the broker anyway. But not everyone 
uses controlled shutdown in all cases and this would be a nice improvement in 
the cases where it's not used.

Actually, even in the controlled shutdown case this provides some limited 
benefit as a final "ack" that the broker is going down. It will remove the 
broker from the metadata provided to clients slightly quicker.

We do need to avoid blocking too long on final heartbeat, though (we should not 
wait more than a second or two at most).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ijuma commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


ijuma commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r798019651



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.utils.ByteUtils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@State(Scope.Thread)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ByteUtilsBenchmark {
+private static final int INPUT_COUNT = 10_000;
+private static final int MAX_INT = 2 * 1024 * 1024;
+
+private final int[] sizeOfInputs = new int[INPUT_COUNT];
+
+@Setup(Level.Trial)
+public void setUp() {
+for (int i = 0; i < INPUT_COUNT; ++i) {
+sizeOfInputs[i] = ThreadLocalRandom.current().nextInt(MAX_INT);
+}
+}
+
+@Benchmark
+public long testSizeOfUnsignedVarint() {
+long result = 0;
+for (final int input : sizeOfInputs) {
+result += ByteUtils.sizeOfUnsignedVarint(input);
+}
+return result;
+}
+
+@Benchmark
+public long testSizeOfUnsignedVarintOne() {
+return ByteUtils.sizeOfUnsignedVarint(sizeOfInputs[0]);
+}
+
+@Benchmark
+public long testSizeOfUnsignedVarintMath() {
+long result = 0;
+for (final int input : sizeOfInputs) {
+ int leadingZeros = Integer.numberOfLeadingZeros(input);
+ result += (38 - leadingZeros) / 7 + leadingZeros / 32;
+}

Review comment:
   Do we want to remove the benchmark methods that include loops and rely 
on the ones without loops?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-02 Thread Ulrik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486102#comment-17486102
 ] 

Ulrik commented on KAFKA-13638:
---

[~mjsax] I can confirm that using an in-memory store worked and the test ran 
very quickly.

I also tried the following rocksdb-versions:
 * kafka-streams 3.1.0 with rocksdb 6.27.3
 * kafka-streams 3.1.0 with rocksdb 6.22.1.1
 * kafka-streams 3.1.0 with rocksdb 6.19.3
 * kafka-streams 3.1.0 with rocksdb 5.18.4

All of the above rocksdb versions resulted in the same slow running test, 
except for rocksdb 5.18.4 which resulted in the following stacktrace:


{code:java}
java.lang.NoClassDefFoundError: org/rocksdb/MutableDBOptionsInterface    at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:128)
    at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:250)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:75)
    at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:125)
    at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:125)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:205)
    at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:97)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:231)
    at 
org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:521)
    at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:370)
    at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:297)
    at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:273)
    at kafkatest.KafkaTest.validateTopologyCanProcessData(KafkaTest.java:79)
    at kafkatest.KafkaTest.multipleForwardsFromTransformer(KafkaTest.java:69)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
    at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
    at 

[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r798002944



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   Yes, I agree - the arithmetic / bitshift looks intuitively quicker since 
it requires no loads. Looking at the compiler output from JDK11, it does get 
compiled down to a sequence of operations that are reasonably clean (no DIV), 
just that there are a lot of instructions. Perhaps there is a slight advantage 
in different use-cases, but, in my case this fix takes the function completely 
out of the profiler results so it solves the problem.
   
   Out of interest sake, here is the sequence of operations according to 
perfasm on JDK11.
   ```
││  0x7f26e04797b7: mov 0x10(%r12,%r10,8),%r10d
 0.05%  ││  0x7f26e04797bc: lzcnt   %r10d,%r11d   ;*invokestatic 
numberOfLeadingZeros {reexecute=0 rethrow=0 return_oop=0}
││; - 
org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@6 
(line 76)
││; - 
org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17
 (line 119)
 0.03%  ││  0x7f26e04797c1: mov $0x26,%r8d
 0.01%  ││  0x7f26e04797c7: sub %r11d,%r8d;*isub 
{reexecute=0 rethrow=0 return_oop=0}
││; - 
org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@13
 (line 77)
││; - 
org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17
 (line 119)
 0.49%  ││  0x7f26e04797ca: mov %r11d,%r9d
 0.36%  ││  0x7f26e04797cd: sar $0x1f,%r9d
 0.02%  ││  0x7f26e04797d1: movsxd  %r8d,%r10
 0.77%  ││  0x7f26e04797d4: shr $0x1b,%r9d
 7.24%  ││  0x7f26e04797d8: add %r11d,%r9d
 0.78%  ││  0x7f26e04797db: imulq   $0x92492493,%r10,%r10
 0.05%  ││  0x7f26e04797e2: sar $0x5,%r9d
 2.24%  ││  0x7f26e04797e6: sar $0x20,%r10
 9.93%  ││  0x7f26e04797ea: movsxd  %r9d,%r11
 0.12%  ││  0x7f26e04797ed: mov %r10d,%r10d
││  0x7f26e04797f0: add %r8d,%r10d
 6.30%  ││  0x7f26e04797f3: sar $0x1f,%r8d
 2.53%  ││  0x7f26e04797f7: sar $0x2,%r10d
 6.21%  ││  0x7f26e04797fb: movsxd  %r8d,%r8
││  0x7f26e04797fe: movsxd  %r10d,%rdx
 7.25%  ││  0x7f26e0479801: sub %r8,%rdx
 7.01%  ││  0x7f26e0479804: add %r11,%rdx ;*i2l 
{reexecute=0 rethrow=0 return_oop=0}
││; - 
org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@22
 (line 77)
││; - 
org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17
 (line 119)
 8.11%  ││  0x7f26e0479807: mov (%rsp),%rsi
   ```
   
   Lookup table:
   ```
│  0x7fde904792a7: mov 0x10(%r12,%r10,8),%r10d
 1.42%  │  0x7fde904792ac: lzcnt   %r10d,%r10d   

[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r798002944



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   Yes, I agree - the arithmetic / bitshift looks intuitively quicker since 
it requires no loads. Looking at the compiler output from JDK11, it does get 
compiled down to a sequence of operations that are reasonably clean (no DIV), 
just that there are a lot of instructions. Perhaps there is a slight advantage 
in different use-cases, but, in my case this fix takes the function completely 
out of the profiler results so it solves the problem.
   
   Out of interest sake, here is the sequence of operations according to 
perfasm on JDK11.
   ```
││  0x7f26e04797b7: mov 0x10(%r12,%r10,8),%r10d
 0.05%  ││  0x7f26e04797bc: lzcnt   %r10d,%r11d   ;*invokestatic 
numberOfLeadingZeros {reexecute=0 rethrow=0 return_oop=0}
││; - 
org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@6 
(line 76)
││; - 
org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17
 (line 119)
 0.03%  ││  0x7f26e04797c1: mov $0x26,%r8d
 0.01%  ││  0x7f26e04797c7: sub %r11d,%r8d;*isub 
{reexecute=0 rethrow=0 return_oop=0}
││; - 
org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@13
 (line 77)
││; - 
org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17
 (line 119)
 0.49%  ││  0x7f26e04797ca: mov %r11d,%r9d
 0.36%  ││  0x7f26e04797cd: sar $0x1f,%r9d
 0.02%  ││  0x7f26e04797d1: movsxd  %r8d,%r10
 0.77%  ││  0x7f26e04797d4: shr $0x1b,%r9d
 7.24%  ││  0x7f26e04797d8: add %r11d,%r9d
 0.78%  ││  0x7f26e04797db: imulq   $0x92492493,%r10,%r10
 0.05%  ││  0x7f26e04797e2: sar $0x5,%r9d
 2.24%  ││  0x7f26e04797e6: sar $0x20,%r10
 9.93%  ││  0x7f26e04797ea: movsxd  %r9d,%r11
 0.12%  ││  0x7f26e04797ed: mov %r10d,%r10d
││  0x7f26e04797f0: add %r8d,%r10d
 6.30%  ││  0x7f26e04797f3: sar $0x1f,%r8d
 2.53%  ││  0x7f26e04797f7: sar $0x2,%r10d
 6.21%  ││  0x7f26e04797fb: movsxd  %r8d,%r8
││  0x7f26e04797fe: movsxd  %r10d,%rdx
 7.25%  ││  0x7f26e0479801: sub %r8,%rdx
 7.01%  ││  0x7f26e0479804: add %r11,%rdx ;*i2l 
{reexecute=0 rethrow=0 return_oop=0}
││; - 
org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@22
 (line 77)
││; - 
org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17
 (line 119)
 8.11%  ││  0x7f26e0479807: mov (%rsp),%rsi
   ```
   
   Lookup table:
   ```
│  0x7fde904792a7: mov 0x10(%r12,%r10,8),%r10d
 1.42%  │  0x7fde904792ac: lzcnt   %r10d,%r10d   

[GitHub] [kafka] jasonk000 commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


jasonk000 commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r798002944



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   Yes, I agree - the arithmetic / bitshift looks intuitively quicker since 
it requires no loads. Looking at the compiler output from JDK11, it does get 
compiled down to a sequence of operations that are reasonably clean. Perhaps 
there is a slight advantage in different use-cases, but, in my case this fix 
takes the function completely out of the profiler results so it solves the 
problem.
   
   Out of interest sake, here is the sequence of operations according to 
perfasm on JDK11.
   ```
││  0x7f26e04797b7: mov 0x10(%r12,%r10,8),%r10d
 0.05%  ││  0x7f26e04797bc: lzcnt   %r10d,%r11d   ;*invokestatic 
numberOfLeadingZeros {reexecute=0 rethrow=0 return_oop=0}
││; - 
org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@6 
(line 76)
││; - 
org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17
 (line 119)
 0.03%  ││  0x7f26e04797c1: mov $0x26,%r8d
 0.01%  ││  0x7f26e04797c7: sub %r11d,%r8d;*isub 
{reexecute=0 rethrow=0 return_oop=0}
││; - 
org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@13
 (line 77)
││; - 
org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17
 (line 119)
 0.49%  ││  0x7f26e04797ca: mov %r11d,%r9d
 0.36%  ││  0x7f26e04797cd: sar $0x1f,%r9d
 0.02%  ││  0x7f26e04797d1: movsxd  %r8d,%r10
 0.77%  ││  0x7f26e04797d4: shr $0x1b,%r9d
 7.24%  ││  0x7f26e04797d8: add %r11d,%r9d
 0.78%  ││  0x7f26e04797db: imulq   $0x92492493,%r10,%r10
 0.05%  ││  0x7f26e04797e2: sar $0x5,%r9d
 2.24%  ││  0x7f26e04797e6: sar $0x20,%r10
 9.93%  ││  0x7f26e04797ea: movsxd  %r9d,%r11
 0.12%  ││  0x7f26e04797ed: mov %r10d,%r10d
││  0x7f26e04797f0: add %r8d,%r10d
 6.30%  ││  0x7f26e04797f3: sar $0x1f,%r8d
 2.53%  ││  0x7f26e04797f7: sar $0x2,%r10d
 6.21%  ││  0x7f26e04797fb: movsxd  %r8d,%r8
││  0x7f26e04797fe: movsxd  %r10d,%rdx
 7.25%  ││  0x7f26e0479801: sub %r8,%rdx
 7.01%  ││  0x7f26e0479804: add %r11,%rdx ;*i2l 
{reexecute=0 rethrow=0 return_oop=0}
││; - 
org.apache.kafka.jmh.util.ByteUtilsBenchmark::testSizeOfUnsignedVarintMathOne@22
 (line 77)
││; - 
org.apache.kafka.jmh.util.jmh_generated.ByteUtilsBenchmark_testSizeOfUnsignedVarintMathOne_jmhTest::testSizeOfUnsignedVarintMathOne_thrpt_jmhStub@17
 (line 119)
 8.11%  ││  0x7f26e0479807: mov (%rsp),%rsi
   ```
   
   Lookup table:
   ```
│  0x7fde904792a7: mov 0x10(%r12,%r10,8),%r10d
 1.42%  │  0x7fde904792ac: lzcnt   %r10d,%r10d   ;*invokestatic 
numberOfLeadingZeros {reexecute=0 

[GitHub] [kafka] artemlivshits commented on a change in pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


artemlivshits commented on a change in pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#discussion_r797932888



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
##
@@ -386,18 +386,39 @@ public static void writeDouble(double value, ByteBuffer 
buffer) {
 buffer.putDouble(value);
 }
 
+final static int[] LEADING_ZEROS_TO_U_VARINT_SIZE = new int[] {
+// 32 bits, and each 7-bits adds one byte to the output
+5, 5, 5, 5, // 32
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
+final static int[] LEADING_ZEROS_TO_U_VARLONG_SIZE = new int[] {
+// 64 bits, and each 7-bits adds one byte to the output
+10, // 64
+9, 9, 9, 9, 9, 9, 9, // 63
+8, 8, 8, 8, 8, 8, 8, // 56
+7, 7, 7, 7, 7, 7, 7, // 49
+6, 6, 6, 6, 6, 6, 6, // 42
+5, 5, 5, 5, 5, 5, 5, // 35
+4, 4, 4, 4, 4, 4, 4, // 28
+3, 3, 3, 3, 3, 3, 3, // 21
+2, 2, 2, 2, 2, 2, 2, // 14
+1, 1, 1, 1, 1, 1, 1, // 7
+1 // 0
+};
+
 /**
  * Number of bytes needed to encode an integer in unsigned variable-length 
format.
  *
  * @param value The signed value
  */
 public static int sizeOfUnsignedVarint(int value) {
-int bytes = 1;
-while ((value & 0xff80) != 0L) {
-bytes += 1;
-value >>>= 7;
-}
-return bytes;
+int leadingZeros = Integer.numberOfLeadingZeros(value);
+return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];

Review comment:
   Interesting.  It's unfortunate that the second operation is a div in 
Java (I'm new to Java, so maybe there is a way to express it differently, but I 
couldn't think of one), in C++ I would've just done:
   ```
  return (38 - leadingZeros) / 7 + (leadingZeros == 32);
   ```
   which would just do cmp and add carryover bit, which is cheaper than div.  
Maybe the compiler would be smart enough to translate ((leadingZeros == 32) ? 1 
: 0) into math expression rather than do a branch?
   In these benchmarks, the lookup table is likely cached in L1 cache (hot loop 
that hits the same small amount of data), so memory access in the benchmark is 
likely cheaper than on average.  It's probably hard to do a proper model and 
I'm not sure if it's worth it.
   In any case, thanks for doing this comprehensive research, good stuff!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13221) Add metric for `PartitionsWithLateTransactionsCount`

2022-02-02 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13221.
-
Fix Version/s: 3.2.0
   Resolution: Fixed

> Add metric for `PartitionsWithLateTransactionsCount`
> 
>
> Key: KAFKA-13221
> URL: https://issues.apache.org/jira/browse/KAFKA-13221
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.2.0
>
>
> The metric `PartitionsWithLateTransactionsCount` was introduced in KIP-664: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-Metrics.
>  This metric will record the number of partitions which have open 
> transactions with durations exceeding `transaction.max.timeout.ms`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji merged pull request #11725: KAFKA-13221; Implement `PartitionsWithLateTransactionsCount` metric

2022-02-02 Thread GitBox


hachikuji merged pull request #11725:
URL: https://github.com/apache/kafka/pull/11725


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

2022-02-02 Thread GitBox


kirktrue commented on pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#issuecomment-1028296460


   This looks pretty straightforward to me, though I'm still learning the 
consumer client piece.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #11729: MINOR: fix control plane listener + kraft error message

2022-02-02 Thread GitBox


cmccabe opened a new pull request #11729:
URL: https://github.com/apache/kafka/pull/11729


   The current error message suggests that controller.listener.names is a 
replacement for
   control.plane.listener.name. This is incorrect since these configurations 
have very different
   functions. This PR deletes the incorrect message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset

2022-02-02 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486005#comment-17486005
 ] 

Sergey Ivanov edited comment on KAFKA-13639 at 2/2/22, 7:04 PM:


We found mail thread which describe the similar issue:

[https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] 

and the same in

[https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume]
 

when __consumer_offsets topic's partition was corrupted due to shutdown Kafka 
cluster (with working clients).

They said that issue was resolved when Kafka compacts topics segments (with our 
retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this 
process and set the following properties for the topic:
{code:java}
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic 
__consumer_offsets --config retention.ms=1
{code}
Then we restarted Kafka cluster, and the issue gone, partition 36 has been 
replicated. (looks like WA, but difficult and strange).

 

We faced this issue at lease one more time and it doesn't go without our 
manually retention changes. So looks like this is not random +bug.+ 

Can anyone help with investigation? We can provide more logs if necessary. 


was (Author: mrmigles):
We found mail thread which describe the similar issue:

[https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] 

and the same in

[https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume]
 

when __consumer_offsets topic's partition was corrupted due to shutdown Kafka 
cluster (with working clients).

They said that issue was resolved when Kafka compacts topics segments (with our 
retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this 
process and set the following properties for the topic:
{code:java}
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic 
__consumer_offsets --config retention.ms=1
{code}
Then we restarted Kafka cluster, and the issue gone, partition 36 has been 
replicated. 

 

We faced this issue at lease one more time and it doesn't go without our 
manually retention changes. So looks like this is not random +bug.+ 

Can anyone help with investigation? We can provide more logs if necessary. 

> NotEnoughReplicasException for __consumer_offsets topic due to out of order 
> offset
> --
>
> Key: KAFKA-13639
> URL: https://issues.apache.org/jira/browse/KAFKA-13639
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 2.6.2
>Reporter: Sergey Ivanov
>Priority: Major
>
> Hello,
> We faced a strange issue with Kafka during testing failover scenarios: this 
> assumes forces shutdown nodes where Kafka pods are placed (Kafka is deployed 
> to Kubernetes), and then return these nodes. 
> After this Kafka pods are started normally but +some+ consumers could not 
> connect to it with errors:
>  
> {code:java}
> [2022-01-27T14:35:09.051][level=DEBUG][class=kafka_client:utils.go:120]: 
> Failed to sync group mae_processor: [15] Group Coordinator Not Available: the 
> broker returns this error code for group coordinator requests, offset 
> commits, and most group management requests if the offsets topic has not yet 
> been created, or if the group coordinator is not active{code}
>  
>  
> It looked like there were issues with ___consumer_offsets_ topic. In logs of 
> brokers we found this error:
> {code:java}
> [2022-01-27T14:56:00,233][INFO][category=kafka.coordinator.group.GroupCoordinator]
>  [GroupCoordinator 1]: Group mae_processor with generation 329 is now empty 
> (__consumer_offsets-36)
> [2022-01-27T14:56:00,233][ERROR][category=kafka.server.ReplicaManager] 
> [ReplicaManager broker=1] Error processing append operation on partition 
> __consumer_offsets-36
> org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(1) is insufficient to satisfy the min.isr requirement of 2 
> for partition __consumer_offsets-36
> [2022-01-27T14:56:00,233][WARN][category=kafka.coordinator.group.GroupCoordinator]
>  [GroupCoordinator 1]: Failed to write empty metadata for group 
> mae_processor: The coordinator is not available.
> {code}
> If we check partitions of __consumer_offsets it really has one partition with 
> insufficient ISR:
> {code:java}
> topic "__consumer_offsets" with 50 partitions:
>     partition 0, leader 1, replicas: 1,3,2, isrs: 1,2,3
> ...
>     partition 35, leader 3, replicas: 3,1,2, isrs: 1,2,3
>     partition 36, leader 1, replicas: 1,3,2, isrs: 1
>     partition 37, leader 2, replicas: 2,1,3, isrs: 1,2,3
> 
>     partition 49, leader 2, replicas: 2,1,3, 

[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-02 Thread GitBox


guozhangwang commented on pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#issuecomment-1028264508


   @ableegoldman @mjsax My read on the code is that we only need to change the 
TopologyTestDriver, while the first place seems fine to me. Did I miss anything?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] azhur closed pull request #11728: Cherrypick KAFKA-4090: Validate SSL connection in client to 3.1.x

2022-02-02 Thread GitBox


azhur closed pull request #11728:
URL: https://github.com/apache/kafka/pull/11728


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12494) Broker raise InternalError after disk sector medium error without marking dir to offline

2022-02-02 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486011#comment-17486011
 ] 

Sergey Ivanov commented on KAFKA-12494:
---

Hi all,

We faced the similar issue on our env when testing failover scenarios: we force 
shutdown Kafka brokers during working, and after restarting and some loading 
they started to raise a lot of errors with "{_}java.lang.InternalError: a fault 
occurred in a recent unsafe memory access operation in compiled Java code"{_} 

For example:
{code:java}
java.lang.InternalError: a fault occurred in a recent unsafe memory access 
operation in compiled Java code
at 
kafka.server.FullFetchContext.$anonfun$updateAndGenerateResponseData$3(FetchSession.scala:373)
at java.base/java.util.LinkedHashMap.forEach(Unknown Source)
at 
kafka.server.FullFetchContext.createNewSession$1(FetchSession.scala:372) 
{code}
{code:java}
java.lang.InternalError: a fault occurred in a recent unsafe memory access 
operation in compiled Java code
at java.base/java.util.LinkedHashMap.newNode(Unknown Source)
at java.base/java.util.HashMap.putVal(Unknown Source)
at java.base/java.util.HashMap.put(Unknown Source)
 {code}
{code:java}
java.lang.InternalError: a fault occurred in a recent unsafe memory access 
operation in compiled Java code
at kafka.server.LogReadResult.error(ReplicaManager.scala:104)
at 
kafka.server.ReplicaManager.$anonfun$updateFollowerFetchState$1(ReplicaManager.scala:1621)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
{code}
And many of others stacktraces.

Looks like java.lang.InternalError is not a root cause of issue, but in logs we 
couldn't find any other errors. 
After Kafka broker started to raise this error it became not fully operated: it 
can't handle requests of clients and other broker.
The issue has gone after restart (looks like WA), but can't image what is the 
+root cause+ of this issue.

Please correct me if this is not right ticket for this issue.

> Broker raise InternalError after disk sector medium error without marking dir 
> to offline
> 
>
> Key: KAFKA-12494
> URL: https://issues.apache.org/jira/browse/KAFKA-12494
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.0, 2.6.0, 2.5.1, 2.7.0
> Environment: Kafka Version: 1.1.0
> Jdk Version:  jdk1.8
>Reporter: iBlackeyes
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> In my produce env, we encounter a case that kafka broker only raise errors 
> like 
>  `_*2021-02-16 23:24:24,965 | ERROR | [data-plane-kafka-request-handler-19] | 
> [ReplicaManager broker=7] Error processing append operation on partition 
> xxx-0 | kafka.server.ReplicaManager (Logging.scala:76)*_ 
> _*java.lang.InternalError: a fault occurred in a recent unsafe memory access 
> operation in compiled Java code*_` 
> when broker append to a error disk sector  and doesn't mark the dir on this 
> disk to offline.
> This result in many partitions which assign replicas on this disk  in 
> under-replicated state . 
> Here is the logs:
> *os messages log:*
> {code:java}
> Feb 16 23:24:24 hd-node109 kernel: blk_update_request: critical medium error, 
> dev sds, sector 2308010408
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] FAILED Result: 
> hostbyte=DID_OK driverbyte=DRIVER_SENSE
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Sense Key : Medium 
> Error [current] 
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Add. Sense: 
> Unrecovered read error
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] CDB: Read(10) 28 00 89 
> 91 71 a8 00 00 08 00
> Feb 16 23:24:24 hd-node109 kernel: blk_update_request: critical medium error, 
> dev sds, sector 2308010408
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] FAILED Result: 
> hostbyte=DID_OK driverbyte=DRIVER_SENSE
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Sense Key : Medium 
> Error [current] 
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Add. Sense: 
> Unrecovered read error
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] CDB: Read(10) 28 00 89 
> 91 71 a8 00 00 08 00
> Feb 16 23:24:24 hd-node109 kernel: blk_update_request: critical medium error, 
> dev sds, sector 2308010408
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] FAILED Result: 
> hostbyte=DID_OK driverbyte=DRIVER_SENSE
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Sense Key : Medium 
> Error [current] 
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] Add. Sense: 
> Unrecovered read error
> Feb 16 23:24:24 hd-node109 kernel: sd 14:1:0:18: [sds] CDB: Read(10) 28 00 89 
> 91 71 a8 00 00 08 00
> Feb 16 

[jira] [Comment Edited] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset

2022-02-02 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486005#comment-17486005
 ] 

Sergey Ivanov edited comment on KAFKA-13639 at 2/2/22, 6:31 PM:


We found mail thread which describe the similar issue:

[https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] 

and the same in

[https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume]
 

when __consumer_offsets topic's partition was corrupted due to shutdown Kafka 
cluster (with working clients).

They said that issue was resolved when Kafka compacts topics segments (with our 
retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this 
process and set the following properties for the topic:
{code:java}
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic 
__consumer_offsets --config retention.ms=1
{code}
Then we restarted Kafka cluster, and the issue gone, partition 36 has been 
replicated. 

 

We faced this issue at lease one more time and it doesn't go without our 
manually retention changes. So looks like this is not random +bug.+ 

Can anyone help with investigation? We can provide more logs if necessary. 


was (Author: mrmigles):
We found mail thread which describe the similar issue:

[https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] 

and the same in

[https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume]
 

when __consumer_offsets topic's partition was corrupted due to shutdown Kafka 
cluster (with working clients).

They said that issue was resolved when Kafka compacts topics segments (with our 
retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this 
process and set the following properties for the topic:
{code:java}
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic 
__consumer_offsets --config retention.ms=1
{code}
Then we restarted Kafka cluster, and {*}the issue gone{*}, partition 36 has 
been replicated. 

 

We faced this issue at lease one more time and it doesn't go without our 
manually retention changes. So looks like this is not random bug. 

Can anyone help with investigation? We can provide more logs if necessary. 

> NotEnoughReplicasException for __consumer_offsets topic due to out of order 
> offset
> --
>
> Key: KAFKA-13639
> URL: https://issues.apache.org/jira/browse/KAFKA-13639
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 2.6.2
>Reporter: Sergey Ivanov
>Priority: Major
>
> Hello,
> We faced a strange issue with Kafka during testing failover scenarios: this 
> assumes forces shutdown nodes where Kafka pods are placed (Kafka is deployed 
> to Kubernetes), and then return these nodes. 
> After this Kafka pods are started normally but +some+ consumers could not 
> connect to it with errors:
>  
> {code:java}
> [2022-01-27T14:35:09.051][level=DEBUG][class=kafka_client:utils.go:120]: 
> Failed to sync group mae_processor: [15] Group Coordinator Not Available: the 
> broker returns this error code for group coordinator requests, offset 
> commits, and most group management requests if the offsets topic has not yet 
> been created, or if the group coordinator is not active{code}
>  
>  
> It looked like there were issues with ___consumer_offsets_ topic. In logs of 
> brokers we found this error:
> {code:java}
> [2022-01-27T14:56:00,233][INFO][category=kafka.coordinator.group.GroupCoordinator]
>  [GroupCoordinator 1]: Group mae_processor with generation 329 is now empty 
> (__consumer_offsets-36)
> [2022-01-27T14:56:00,233][ERROR][category=kafka.server.ReplicaManager] 
> [ReplicaManager broker=1] Error processing append operation on partition 
> __consumer_offsets-36
> org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(1) is insufficient to satisfy the min.isr requirement of 2 
> for partition __consumer_offsets-36
> [2022-01-27T14:56:00,233][WARN][category=kafka.coordinator.group.GroupCoordinator]
>  [GroupCoordinator 1]: Failed to write empty metadata for group 
> mae_processor: The coordinator is not available.
> {code}
> If we check partitions of __consumer_offsets it really has one partition with 
> insufficient ISR:
> {code:java}
> topic "__consumer_offsets" with 50 partitions:
>     partition 0, leader 1, replicas: 1,3,2, isrs: 1,2,3
> ...
>     partition 35, leader 3, replicas: 3,1,2, isrs: 1,2,3
>     partition 36, leader 1, replicas: 1,3,2, isrs: 1
>     partition 37, leader 2, replicas: 2,1,3, isrs: 1,2,3
> 
>     partition 49, leader 2, replicas: 2,1,3, isrs: 1,2,3{code}
> We wait some time 

[GitHub] [kafka] jasonk000 edited a comment on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


jasonk000 edited a comment on pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#issuecomment-1028229086


   Good spot on jmh 1.34, TIL! Here's a result from a re-run
   ```
   jkoch@jkoch:~/code/kafka$ java -version
   openjdk version "17.0.1" 2021-10-19
   OpenJDK Runtime Environment (build 17.0.1+12-Ubuntu-121.10)
   OpenJDK 64-Bit Server VM (build 17.0.1+12-Ubuntu-121.10, mixed mode, sharing)
   ```
   ```
   jkoch@jkoch:~/code/kafka$ cat gradle/dependencies.gradle  | grep jmh
 jmh: "1.34",
 jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh",
 jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
   ```
   With `-perfnorm`, the interesting results seem to be:
   - IPC, cache miss rates are similarly good across all
   - The math version has 2 branches, the lookup table has 3 branches, and the 
baseline loop has 6 branches. All well predicted.
   - It seems to come simply down to instruction counts: math has 31, lookup 
has 15, and baseline has 27.
   
   ```
   Benchmark
Mode  CntScore   Error  Units
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne   
   thrpt5   520507.594 ±  3323.803 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:IPC   
   thrpt 4.759  insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:branches  
   thrpt 2.002   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:cycles
   thrpt 6.518   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:instructions  
   thrpt31.016   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne   
   thrpt5  1035024.683 ± 25061.922 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:IPC   
   thrpt 4.576  insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:branches  
   thrpt 3.000   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:cycles
   thrpt 3.278   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:instructions  
   thrpt15.000   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne   
   thrpt5   676262.808 ±  4065.238 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:IPC   
   thrpt 5.385  insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:branches  
   thrpt 6.001   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:cycles
   thrpt 5.016   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:instructions  
   thrpt27.013   #/op
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonk000 commented on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


jasonk000 commented on pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#issuecomment-1028229086


   Good spot on jmh 1.34, TIL! Here's a result from a re-run
   ```
   jkoch@jkoch:~/code/kafka$ java -version
   openjdk version "17.0.1" 2021-10-19
   OpenJDK Runtime Environment (build 17.0.1+12-Ubuntu-121.10)
   OpenJDK 64-Bit Server VM (build 17.0.1+12-Ubuntu-121.10, mixed mode, sharing)
   ```
   ```
   jkoch@jkoch:~/code/kafka$ cat gradle/dependencies.gradle  | grep jmh
 jmh: "1.34",
 jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh",
 jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
   ```
   Benchmarks
   ```
   Benchmark
Mode  CntScore   Error  Units
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne   
   thrpt5   520507.594 ±  3323.803 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne   
   thrpt5  1035024.683 ± 25061.922 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne   
   thrpt5   676262.808 ±  4065.238 ops/ms
   ```
   With `-perfnorm`, the interesting results seem to be:
   - IPC, cache miss rates are similarly good across all
   - The math version has 2 branches, the lookup table has 3 branches, and the 
baseline loop has 6 branches. All well predicted.
   - It seems to come simply down to instruction counts: math has 31, lookup 
has 15, and baseline has 27.
   
   ```
   Benchmark
Mode  CntScore   Error  Units
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne   
   thrpt5   520507.594 ±  3323.803 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:IPC   
   thrpt 4.759  insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:branches  
   thrpt 2.002   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:cycles
   thrpt 6.518   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintMathOne:instructions  
   thrpt31.016   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne   
   thrpt5  1035024.683 ± 25061.922 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:IPC   
   thrpt 4.576  insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:branches  
   thrpt 3.000   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:cycles
   thrpt 3.278   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOne:instructions  
   thrpt15.000   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne   
   thrpt5   676262.808 ±  4065.238 ops/ms
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:IPC   
   thrpt 5.385  insns/clk
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:branches  
   thrpt 6.001   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:cycles
   thrpt 5.016   #/op
   ByteUtilsBenchmark.testSizeOfUnsignedVarintOriginalOne:instructions  
   thrpt27.013   #/op
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset

2022-02-02 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486005#comment-17486005
 ] 

Sergey Ivanov commented on KAFKA-13639:
---

We found mail thread which describe the similar issue:

[https://lists.apache.org/thread/2l1snn1jrzd5p2d0n55vs3dg5jr7f35v] 

and the same in

[https://stackoverflow.com/questions/64514851/apache-kafka-kafka-common-offsetsoutoforderexception-when-reassigning-consume]
 

when __consumer_offsets topic's partition was corrupted due to shutdown Kafka 
cluster (with working clients).

They said that issue was resolved when Kafka compacts topics segments (with our 
retention is 1Gb or 7 day). We couldn't wait that time so we speeded up this 
process and set the following properties for the topic:
{code:java}
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic 
__consumer_offsets --config retention.ms=1
{code}
Then we restarted Kafka cluster, and {*}the issue gone{*}, partition 36 has 
been replicated. 

 

We faced this issue at lease one more time and it doesn't go without our 
manually retention changes. So looks like this is not random bug. 

Can anyone help with investigation? We can provide more logs if necessary. 

> NotEnoughReplicasException for __consumer_offsets topic due to out of order 
> offset
> --
>
> Key: KAFKA-13639
> URL: https://issues.apache.org/jira/browse/KAFKA-13639
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 2.6.2
>Reporter: Sergey Ivanov
>Priority: Major
>
> Hello,
> We faced a strange issue with Kafka during testing failover scenarios: this 
> assumes forces shutdown nodes where Kafka pods are placed (Kafka is deployed 
> to Kubernetes), and then return these nodes. 
> After this Kafka pods are started normally but +some+ consumers could not 
> connect to it with errors:
>  
> {code:java}
> [2022-01-27T14:35:09.051][level=DEBUG][class=kafka_client:utils.go:120]: 
> Failed to sync group mae_processor: [15] Group Coordinator Not Available: the 
> broker returns this error code for group coordinator requests, offset 
> commits, and most group management requests if the offsets topic has not yet 
> been created, or if the group coordinator is not active{code}
>  
>  
> It looked like there were issues with ___consumer_offsets_ topic. In logs of 
> brokers we found this error:
> {code:java}
> [2022-01-27T14:56:00,233][INFO][category=kafka.coordinator.group.GroupCoordinator]
>  [GroupCoordinator 1]: Group mae_processor with generation 329 is now empty 
> (__consumer_offsets-36)
> [2022-01-27T14:56:00,233][ERROR][category=kafka.server.ReplicaManager] 
> [ReplicaManager broker=1] Error processing append operation on partition 
> __consumer_offsets-36
> org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(1) is insufficient to satisfy the min.isr requirement of 2 
> for partition __consumer_offsets-36
> [2022-01-27T14:56:00,233][WARN][category=kafka.coordinator.group.GroupCoordinator]
>  [GroupCoordinator 1]: Failed to write empty metadata for group 
> mae_processor: The coordinator is not available.
> {code}
> If we check partitions of __consumer_offsets it really has one partition with 
> insufficient ISR:
> {code:java}
> topic "__consumer_offsets" with 50 partitions:
>     partition 0, leader 1, replicas: 1,3,2, isrs: 1,2,3
> ...
>     partition 35, leader 3, replicas: 3,1,2, isrs: 1,2,3
>     partition 36, leader 1, replicas: 1,3,2, isrs: 1
>     partition 37, leader 2, replicas: 2,1,3, isrs: 1,2,3
> 
>     partition 49, leader 2, replicas: 2,1,3, isrs: 1,2,3{code}
> We wait some time but the issue didn't go, we still had one partition with 
> insufficient ISR.
> First of all we [thought 
> |https://stackoverflow.com/questions/51491152/fixing-under-replicated-partitions-in-kafka/53540963#53540963]this
>  is issue with Kafka-ZooKeeper coordinations, so we restarted ZooKeeper 
> cluster and brokers 2 and 3, which didn't have ISR. +But it didn't help.+
> We also tried to manually ellect leader for this partition with 
> kafka-leader-election.sh (in hope it will help). +But it didn't help too.+
> In logs we also found an issue:
> {code:java}
> [2022-01-27T16:17:29,531][ERROR][category=kafka.server.ReplicaFetcherThread] 
> [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Unexpected error 
> occurred while processing data for partition __consumer_offsets-36 at offset 
> 19536
> kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append 
> to __consumer_offsets-36: List(19536, 19536, 19537, 19538, 19539, 19540, 
> 19541, 19542, 19543, 19544, 19545, 19546, 19547, 19548, 19549, 19550, 19551, 
> 19552, 19553, 19554, 19555, 19556, 19557, 19558, 19559, 19560, 19561)
>     at 

[jira] [Commented] (KAFKA-13633) Merging multiple KStreams in one operation

2022-02-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486002#comment-17486002
 ] 

Matthias J. Sax commented on KAFKA-13633:
-

Thanks! Will try to review the KIP soon.

> Merging multiple KStreams in one operation
> --
>
> Key: KAFKA-13633
> URL: https://issues.apache.org/jira/browse/KAFKA-13633
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: kip
>
> The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} 
> with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 
> {{{}KStream{}}}s together. Currently, the best way to do this is using Java's 
> {{{}Stream.reduce{}}}:
> {noformat}
> List> streams ...;
> streams.stream().reduce((left, right) -> left.merge(right));{noformat}
> This creates a {{merge}} node in the process graph for every {{KStream}} in 
> the collection being merged.
> Complex process graphs can make understanding an application and debugging 
> more difficult, therefore, we propose a new API that creates a single 
> {{merge}} node in the process graph, irrespective of the number of 
> {{{}KStream{}}}s being merged:
> {noformat}
> KStream merge(Collection> streams);
> KStream merge(Collection streams, Named named);{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13633) Merging multiple KStreams in one operation

2022-02-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13633:

Labels: kip  (was: )

> Merging multiple KStreams in one operation
> --
>
> Key: KAFKA-13633
> URL: https://issues.apache.org/jira/browse/KAFKA-13633
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: kip
>
> The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} 
> with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 
> {{{}KStream{}}}s together. Currently, the best way to do this is using Java's 
> {{{}Stream.reduce{}}}:
> {noformat}
> List> streams ...;
> streams.stream().reduce((left, right) -> left.merge(right));{noformat}
> This creates a {{merge}} node in the process graph for every {{KStream}} in 
> the collection being merged.
> Complex process graphs can make understanding an application and debugging 
> more difficult, therefore, we propose a new API that creates a single 
> {{merge}} node in the process graph, irrespective of the number of 
> {{{}KStream{}}}s being merged:
> {noformat}
> KStream merge(Collection> streams);
> KStream merge(Collection streams, Named named);{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] philipnee commented on a change in pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

2022-02-02 Thread GitBox


philipnee commented on a change in pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#discussion_r797884312



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -1506,6 +1508,46 @@ public void testNullTopicName() {
 "key".getBytes(StandardCharsets.UTF_8), 
"value".getBytes(StandardCharsets.UTF_8)));
 }
 
+@Test
+public void testCallbackHandlesError() throws Exception {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+Time time = new MockTime();
+ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+MockClient client = new MockClient(time, producerMetadata);
+
+String invalidTopicName = "topic abc"; // Invalid topic name due to 
space
+
+try (Producer producer = kafkaProducer(configs, new 
StringSerializer(), new StringSerializer(),
+producerMetadata, client, null, time)) {
+ProducerRecord record = new 
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+// Here's the important piece of the test. Let's make sure that 
the RecordMetadata we get
+// is non-null and adheres to the onCompletion contract.
+Callback callBack = (recordMetadata, exception) -> {
+assertNotNull(exception);
+assertNotNull(recordMetadata);
+
+assertNotNull(recordMetadata.topic(), "Topic name should be 
valid even on send failure");

Review comment:
   Done - see line 1545 in the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset

2022-02-02 Thread Sergey Ivanov (Jira)
Sergey Ivanov created KAFKA-13639:
-

 Summary: NotEnoughReplicasException for __consumer_offsets topic 
due to out of order offset
 Key: KAFKA-13639
 URL: https://issues.apache.org/jira/browse/KAFKA-13639
 Project: Kafka
  Issue Type: Bug
  Components: core, log
Affects Versions: 2.6.2
Reporter: Sergey Ivanov


Hello,

We faced a strange issue with Kafka during testing failover scenarios: this 
assumes forces shutdown nodes where Kafka pods are placed (Kafka is deployed to 
Kubernetes), and then return these nodes. 

After this Kafka pods are started normally but +some+ consumers could not 
connect to it with errors:

 
{code:java}
[2022-01-27T14:35:09.051][level=DEBUG][class=kafka_client:utils.go:120]: Failed 
to sync group mae_processor: [15] Group Coordinator Not Available: the broker 
returns this error code for group coordinator requests, offset commits, and 
most group management requests if the offsets topic has not yet been created, 
or if the group coordinator is not active{code}
 

 

It looked like there were issues with ___consumer_offsets_ topic. In logs of 
brokers we found this error:
{code:java}
[2022-01-27T14:56:00,233][INFO][category=kafka.coordinator.group.GroupCoordinator]
 [GroupCoordinator 1]: Group mae_processor with generation 329 is now empty 
(__consumer_offsets-36)
[2022-01-27T14:56:00,233][ERROR][category=kafka.server.ReplicaManager] 
[ReplicaManager broker=1] Error processing append operation on partition 
__consumer_offsets-36
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
current ISR Set(1) is insufficient to satisfy the min.isr requirement of 2 for 
partition __consumer_offsets-36
[2022-01-27T14:56:00,233][WARN][category=kafka.coordinator.group.GroupCoordinator]
 [GroupCoordinator 1]: Failed to write empty metadata for group mae_processor: 
The coordinator is not available.

{code}




If we check partitions of __consumer_offsets it really has one partition with 
insufficient ISR:
{code:java}
topic "__consumer_offsets" with 50 partitions:
    partition 0, leader 1, replicas: 1,3,2, isrs: 1,2,3
...
    partition 35, leader 3, replicas: 3,1,2, isrs: 1,2,3
    partition 36, leader 1, replicas: 1,3,2, isrs: 1
    partition 37, leader 2, replicas: 2,1,3, isrs: 1,2,3

    partition 49, leader 2, replicas: 2,1,3, isrs: 1,2,3{code}
We wait some time but the issue didn't go, we still had one partition with 
insufficient ISR.





First of all we [thought 
|https://stackoverflow.com/questions/51491152/fixing-under-replicated-partitions-in-kafka/53540963#53540963]this
 is issue with Kafka-ZooKeeper coordinations, so we restarted ZooKeeper cluster 
and brokers 2 and 3, which didn't have ISR. +But it didn't help.+

We also tried to manually ellect leader for this partition with 
kafka-leader-election.sh (in hope it will help). +But it didn't help too.+

In logs we also found an issue:


{code:java}
[2022-01-27T16:17:29,531][ERROR][category=kafka.server.ReplicaFetcherThread] 
[ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Unexpected error occurred 
while processing data for partition __consumer_offsets-36 at offset 19536
kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append 
to __consumer_offsets-36: List(19536, 19536, 19537, 19538, 19539, 19540, 19541, 
19542, 19543, 19544, 19545, 19546, 19547, 19548, 19549, 19550, 19551, 19552, 
19553, 19554, 19555, 19556, 19557, 19558, 19559, 19560, 19561)
    at kafka.log.Log.$anonfun$append$2(Log.scala:1126)
    at kafka.log.Log.append(Log.scala:2349)
    at kafka.log.Log.appendAsFollower(Log.scala:1036)
    at 
[2022-01-27T16:17:29,531][WARN][category=kafka.server.ReplicaFetcherThread] 
[ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Partition 
__consumer_offsets-36 marked as failed
{code}
This looks like root cause, right? Can force shutdown Kafka process lead to 
this issue?

Looks like a bug, moreover, shall Kafka handle case of corrupting data (if it's 
the root cause of issue above)?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486001#comment-17486001
 ] 

Matthias J. Sax commented on KAFKA-13638:
-

We did update the RocksDB version in 3.0.0 release: 
https://issues.apache.org/jira/browse/KAFKA-8897

Could you repeat the test with in-memory store or try to switch back to 
previous RocksDB version in 3.0/3.1 release and retest?

Btw: it seems we are dumping RocksDB for 3.2.0 again: 
https://issues.apache.org/jira/browse/KAFKA-13599 – so you could also try 
`trunk` or 3.0/3.1 with the new RocksDB version to see if it changes the 
performance.

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cmccabe commented on a change in pull request #11689: Fixed documentation and handles null topicPartition for KAFKA-12841

2022-02-02 Thread GitBox


cmccabe commented on a change in pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#discussion_r797873026



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -1506,6 +1508,46 @@ public void testNullTopicName() {
 "key".getBytes(StandardCharsets.UTF_8), 
"value".getBytes(StandardCharsets.UTF_8)));
 }
 
+@Test
+public void testCallbackHandlesError() throws Exception {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+Time time = new MockTime();
+ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+MockClient client = new MockClient(time, producerMetadata);
+
+String invalidTopicName = "topic abc"; // Invalid topic name due to 
space
+
+try (Producer producer = kafkaProducer(configs, new 
StringSerializer(), new StringSerializer(),
+producerMetadata, client, null, time)) {
+ProducerRecord record = new 
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+// Here's the important piece of the test. Let's make sure that 
the RecordMetadata we get
+// is non-null and adheres to the onCompletion contract.
+Callback callBack = (recordMetadata, exception) -> {
+assertNotNull(exception);
+assertNotNull(recordMetadata);
+
+assertNotNull(recordMetadata.topic(), "Topic name should be 
valid even on send failure");

Review comment:
   can you please also check that the partition id gets set to -1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-02 Thread GitBox


vamossagar12 commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r797848030



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -329,7 +330,7 @@ private TopologyTestDriver(final InternalTopologyBuilder 
builder,
 
 final ThreadCache cache = new ThreadCache(
 logContext,
-Math.max(0, 
streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
+Math.max(0, 
streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)),

Review comment:
   Oh here, I thought since it's a test case so it shouldn't really matter. 
Isn't that the case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-02-02 Thread GitBox


vamossagar12 commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r797845874



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
 maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 log.info("Topology {} is overriding {} to {}", topologyName, 
BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
 } else {
-maxBufferedSize = 
globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+maxBufferedSize = 
globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+? 
globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : 
-1;
 }
 
-if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
-cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-log.info("Topology {} is overriding {} to {}", topologyName, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) ||
+isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+
+if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+log.info("Topology {} is using both {} and deprecated config 
{}. overriding {} to {}",
+topologyName,
+STATESTORE_CACHE_MAX_BYTES_CONFIG,
+CACHE_MAX_BYTES_BUFFERING_CONFIG,
+STATESTORE_CACHE_MAX_BYTES_CONFIG,
+cacheSize);
+} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+log.info("Topology {} is using deprecated config {}. 
overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+} else {
+cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+log.info("Topology {} is overriding {} to {}", topologyName, 
STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize);
+}
 } else {
-cacheSize = 
globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+cacheSize = 
globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);

Review comment:
   @ableegoldman , I have added that check. The line above is in the else 
block when neither of the 2 configs are set. Here's the complete block of code 
=>
   ```
   if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) 
||
   isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
   
   if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
   cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
   log.info("Topology {} is using both {} and deprecated config 
{}. overriding {} to {}",
   topologyName,
   STATESTORE_CACHE_MAX_BYTES_CONFIG,
   CACHE_MAX_BYTES_BUFFERING_CONFIG,
   STATESTORE_CACHE_MAX_BYTES_CONFIG,
   cacheSize);
   } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
   cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
   log.info("Topology {} is using deprecated config {}. 
overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
   } else {
   cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
   log.info("Topology {} is overriding {} to {}", topologyName, 
STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize);
   }
   } else {
   cacheSize = 
globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
   }
   ```
   
   Am I missing something here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-02 Thread GitBox


Indupa edited a comment on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028142767


   Hi @dongjinleekr ,I could able to build latest patch and also need one input 
from you.
   
   Is All dependencies of log4j 1.x is completely Removed in this 
Patch?, I could see,still dependency on log4j_1.2.17 in 
build.gradle and dependency.gradle.Also there are dependency on 
log4j.properties and tools-log4j.properties instead of log4j2.properties and 
tools-log4j2.properties in some of the files.Is it still require or we can 
remove those dependencies as well.?.
   
   The things I tried from my end is as follows,
   
   1. I tried updating build.gradle and dependency.gradle by removing the 
dependency of log4j.
   2. Also,i tried updating some of the files,where you have added echo 
statement to update log4j.properties into log4j2.properties in those places 
where u have mentioned in that patch file.
   3. After that,i compiled the code and extracted  folder under 
"C:\kafka_2.8.1\core\build\distributions\kafka_2.13-2.8.1\kafka_2.13-2.8.1" and 
named it as kafka.zip file and using in our component by installing and run it 
as kafka Service.
   4.But when i tried running kafka,iam getting following exception.
   
2022-02-02 05:57:17.158  [INF] [Kafka] Connecting to localhost:2181
   2022-02-02 05:57:27.571  [INF] [Kafka] WATCHER::
   2022-02-02 05:57:27.571  [INF] [Kafka] WatchedEvent state:SyncConnected 
type:None path:null
   2022-02-02 05:57:27.574  [INF] [Kafka] []
   2022-02-02 05:58:17.227  [ERR] [Kafka] ERROR StatusLogger Reconfiguration 
failed: No configuration found for '764c12b6' at 'null' in 'null'
   2022-02-02 05:58:17.684  [INF] [Kafka] DEPRECATED: using log4j 1.x 
configuration. To use log4j 2.x configuration, run with: 'set 
KAFKA_LOG4J_OPTS=-Dlog4j.configurationFile=file:C:\kafka/config/tools-log4j2.properties'
   
   To brief about my requirement is , Currently the kafka package we 
using,contains some of the patches which we have added on top of kafka_2.8.1 
source code.In which one the custom change we have made is,we are using 
apache-log4j-extras 1.2.17 with timebased triggering policy for rolling log 
files as it is not  available in log4j.1.2.17. Since this version has 
vulnerability ,we wanted to use that log4j2 api for this rolling policy logic 
which is working in your patch.
   
Can you please help me on this...?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Indupa edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-02 Thread GitBox


Indupa edited a comment on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028142767


   Hi @dongjinleekr ,I could able to build latest patch and also need one input 
from you.
   
   Is All dependencies of log4j 1.x is completely Removed in this 
Patch?, I could see,still dependency on log4j_1.2.17 in 
build.gradle and dependency.gradle.Also there are dependency on 
log4j.properties and tools-log4j.properties instead of log4j2.properties and 
tools-log4j2.properties in some of the files.Is it still require or we can 
remove those dependencies as well.?
Can you please help me on this...?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-02 Thread GitBox


Indupa commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1028142767


   Hi @dongjinleekr , need one input from you.
   
   Is All dependencies of log4j 1.x is completely Removed in this 
Patch?, I could see,still dependency on log4j_1.2.17 in 
build.gradle and dependency.gradle.Also there are dependency on 
log4j.properties and tools-log4j.properties instead of log4j2.properties and 
tools-log4j2.properties in some of the files.Is it still require or we can 
remove those dependencies as well.?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list

2022-02-02 Thread GitBox


wcarlson5 commented on a change in pull request #11712:
URL: https://github.com/apache/kafka/pull/11712#discussion_r797772309



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
 return readOnlyActiveTasks;
 }
 
+List orderedActiveTasks() {
+return Collections.unmodifiableList(orderedActiveTasks);
+}
+
+void moveActiveTasksToTailFor(final String topologyName) {
+final List tasksToMove = new LinkedList<>();
+final Iterator iterator = orderedActiveTasks.iterator();
+while (iterator.hasNext()) {
+final Task task = iterator.next();
+if (task.id().topologyName().equals(topologyName)) {
+iterator.remove();
+tasksToMove.add(task);
+}
+}
+orderedActiveTasks.addAll(tasksToMove);

Review comment:
   It would be much simpler but unfortunately its not as simple as we first 
thought. The producer has only one transaction, so the records of the good 
tasks are mixed in with the records of the failed task and there is no way to 
separate them. So we need to take the tasks that we know will fail and process 
all the other tasks without them. That way we continue making progress. We can 
take the failing tasks and backoff and retry again later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-02 Thread Fabrice Bauzac-Stehly (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485878#comment-17485878
 ] 

Fabrice Bauzac-Stehly edited comment on KAFKA-9366 at 2/2/22, 3:17 PM:
---

https://kafka.apache.org/cve-list claims to list all security
vulnerabilities that are fixed in released versions of Apache Kafka.
I understand this as "all other vulnerabilities are potentially
exploitable unless explicitly told otherwise".  Here is the list as of
2022-02-02 Wednesday:

- CVE-2017-12610 Authenticated Kafka clients may impersonate other
  users

- CVE-2018-1288 Authenticated Kafka clients may interfere with data
  replication

- CVE-2018-17196 Authenticated clients with Write permission may
  bypass transaction/idempotent ACL validation

- CVE-2019-12399 Apache Kafka Connect REST API may expose plaintext
  secrets in tasks endpoint

- CVE-2021-4104 Flaw in Apache Log4j logging library in versions 1.x

- CVE-2021-38153 Timing Attack Vulnerability for Apache Kafka Connect
  and Clients

- CVE-2021-44228 Flaw in Apache Log4j logging library in versions from
  2.0.0 and before 2.15.0

- CVE-2021-45046 Flaw in Apache Log4j logging library in versions from
  2.0-beta9 through 2.12.1 and from 2.13.0 through 2.15.0

- CVE-2022-23307 Deserialization of Untrusted Data Flaw in Apache
  Log4j logging library in versions 1.x

https://logging.apache.org/log4j/1.2/ lists several vulnerabilities
that affect log4j 1.x.  As of 2022-02-02 Wednesday:

- CVE-2019-17571 is a high severity issue targeting the
  SocketServer. Log4j includes a SocketServer that accepts serialized
  log events and deserializes them without verifying whether the
  objects are allowed or not. This can provide an attack vector that
  can be expoited.
  => NOT FIXED IN KAFKA?

- CVE-2020-9488 is a moderate severity issue with the
  SMTPAppender. Improper validation of certificate with host mismatch
  in Apache Log4j SMTP appender. This could allow an SMTPS connection
  to be intercepted by a man-in-the-middle attack which could leak any
  log messages sent through that appender.
  => NOT FIXED IN KAFKA?

- CVE-2021-4104 is a high severity deserialization vulnerability in
  JMSAppender. JMSAppender uses JNDI in an unprotected manner allowing
  any application using the JMSAppender to be vulnerable if it is
  configured to reference an untrusted site or if the site referenced
  can be accesseed by the attacker. For example, the attacker can
  cause remote code execution by manipulating the data in the LDAP
  store.
  => mitigated: one can remove JMSAppender from the log4j-1.2.17.jar
  artifact.

- CVE-2022-23302 is a high severity deserialization vulnerability in
  JMSSink. JMSSink uses JNDI in an unprotected manner allowing any
  application using the JMSSink to be vulnerable if it is configured
  to reference an untrusted site or if the site referenced can be
  accesseed by the attacker. For example, the attacker can cause
  remote code execution by manipulating the data in the LDAP store.
  => NOT FIXED IN KAFKA?

- CVE-2022-23305 is a high serverity SQL injection flaw in
  JDBCAppender that allows the data being logged to modify the
  behavior of the component. By design, the JDBCAppender in Log4j
  1.2.x accepts an SQL statement as a configuration parameter where
  the values to be inserted are converters from PatternLayout. The
  message converter, %m, is likely to always be included. This allows
  attackers to manipulate the SQL by entering crafted strings into
  input fields or headers of an application that are logged allowing
  unintended SQL queries to be executed.
  => NOT FIXED IN KAFKA?

- CVE-2022-23307 is a critical severity against the chainsaw component
  in Log4j 1.x. This is the same issue corrected in CVE-2020-9493
  fixed in Chainsaw 2.1.0 but Chainsaw was included as part of Log4j
  1.2.x.
  => mitigated: one can remove Chainsaw from the log4j-1.2.17.jar
  artifact.

>From all that, it looks like there is a number of still-open
vulnerabilities in kafka induced by the use of log4j?  Can somebody
confirm?



was (Author: noonbs):
https://kafka.apache.org/cve-list claims to list all security
vulnerabilities that are fixed in released versions of Apache Kafka.
I understand this as "all other vulnerabilities are potentially
exploitable unless explicitly told otherwise".  Here is the list as of
2022-02-02 Wednesday:

- CVE-2017-12610 Authenticated Kafka clients may impersonate other
  users

- CVE-2018-1288 Authenticated Kafka clients may interfere with data
  replication

- CVE-2018-17196 Authenticated clients with Write permission may
  bypass transaction/idempotent ACL validation

- CVE-2019-12399 Apache Kafka Connect REST API may expose plaintext
  secrets in tasks endpoint

- CVE-2021-4104 Flaw in Apache Log4j logging library in versions 1.x

- CVE-2021-38153 Timing Attack Vulnerability for Apache Kafka Connect
  and 

[jira] [Commented] (KAFKA-13626) NullPointerException in Selector.pollSelectionKeys: channel is null

2022-02-02 Thread Kvicii.Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485879#comment-17485879
 ] 

Kvicii.Yu commented on KAFKA-13626:
---

[~haeuserd]  

Thanks for your report, but what exactly do you mean by network issue, I don't 
think this should be counted as an issue.

> NullPointerException in Selector.pollSelectionKeys: channel is null
> ---
>
> Key: KAFKA-13626
> URL: https://issues.apache.org/jira/browse/KAFKA-13626
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.7.1
>Reporter: Daniel Häuser
>Priority: Minor
>
> This NullPointerException occured while we were having networking issues.
> Unfortunately I cannot provide much more information than this stack trace 
> because this is all I got from our operations team.
> {code:java}
> java.lang.IllegalStateException: This error handler cannot process 
> 'java.lang.NullPointerException's; no record information is available
> at 
> org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200)
> at 
> org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1599)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at java.base/java.lang.Thread.run(Thread.java:831)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "org.apache.kafka.common.network.KafkaChannel.id()" because "channel" is null
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:516)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:257)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> at jdk.internal.reflect.GeneratedMethodAccessor128.invoke(Unknown 
> Source)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:567)
> at 
> org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
> at 
> org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
> at jdk.proxy2/jdk.proxy2.$Proxy137.poll(Unknown Source)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249)
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)
> ... 3 common frames omitted {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2022-02-02 Thread Fabrice Bauzac-Stehly (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485878#comment-17485878
 ] 

Fabrice Bauzac-Stehly commented on KAFKA-9366:
--

https://kafka.apache.org/cve-list claims to list all security
vulnerabilities that are fixed in released versions of Apache Kafka.
I understand this as "all other vulnerabilities are potentially
exploitable unless explicitly told otherwise".  Here is the list as of
2022-02-02 Wednesday:

- CVE-2017-12610 Authenticated Kafka clients may impersonate other
  users

- CVE-2018-1288 Authenticated Kafka clients may interfere with data
  replication

- CVE-2018-17196 Authenticated clients with Write permission may
  bypass transaction/idempotent ACL validation

- CVE-2019-12399 Apache Kafka Connect REST API may expose plaintext
  secrets in tasks endpoint

- CVE-2021-4104 Flaw in Apache Log4j logging library in versions 1.x

- CVE-2021-38153 Timing Attack Vulnerability for Apache Kafka Connect
  and Clients

- CVE-2021-44228 Flaw in Apache Log4j logging library in versions from
  2.0.0 and before 2.15.0

- CVE-2021-45046 Flaw in Apache Log4j logging library in versions from
  2.0-beta9 through 2.12.1 and from 2.13.0 through 2.15.0

- CVE-2022-23307 Deserialization of Untrusted Data Flaw in Apache
  Log4j logging library in versions 1.x

https://logging.apache.org/log4j/1.2/ lists several vulnerabilities
that affect log4j 1.x.  As of 2022-02-02 Wednesday:

- CVE-2019-17571 is a high severity issue targeting the
  SocketServer. Log4j includes a SocketServer that accepts serialized
  log events and deserializes them without verifying whether the
  objects are allowed or not. This can provide an attack vector that
  can be expoited.

  => NOT FIXED IN KAFKA?

- CVE-2020-9488 is a moderate severity issue with the
  SMTPAppender. Improper validation of certificate with host mismatch
  in Apache Log4j SMTP appender. This could allow an SMTPS connection
  to be intercepted by a man-in-the-middle attack which could leak any
  log messages sent through that appender.

  => NOT FIXED IN KAFKA?

- CVE-2021-4104 is a high severity deserialization vulnerability in
  JMSAppender. JMSAppender uses JNDI in an unprotected manner allowing
  any application using the JMSAppender to be vulnerable if it is
  configured to reference an untrusted site or if the site referenced
  can be accesseed by the attacker. For example, the attacker can
  cause remote code execution by manipulating the data in the LDAP
  store.

  => mitigated: one can remove JMSAppender from the log4j-1.2.17.jar
  artifact.

- CVE-2022-23302 is a high severity deserialization vulnerability in
  JMSSink. JMSSink uses JNDI in an unprotected manner allowing any
  application using the JMSSink to be vulnerable if it is configured
  to reference an untrusted site or if the site referenced can be
  accesseed by the attacker. For example, the attacker can cause
  remote code execution by manipulating the data in the LDAP store.

  => NOT FIXED IN KAFKA?

- CVE-2022-23305 is a high serverity SQL injection flaw in
  JDBCAppender that allows the data being logged to modify the
  behavior of the component. By design, the JDBCAppender in Log4j
  1.2.x accepts an SQL statement as a configuration parameter where
  the values to be inserted are converters from PatternLayout. The
  message converter, %m, is likely to always be included. This allows
  attackers to manipulate the SQL by entering crafted strings into
  input fields or headers of an application that are logged allowing
  unintended SQL queries to be executed.

  => NOT FIXED IN KAFKA?

- CVE-2022-23307 is a critical severity against the chainsaw component
  in Log4j 1.x. This is the same issue corrected in CVE-2020-9493
  fixed in Chainsaw 2.1.0 but Chainsaw was included as part of Log4j
  1.2.x.

  => mitigated: one can remove Chainsaw from the log4j-1.2.17.jar
  artifact.

>From all that, it looks like there is a number of still-open
vulnerabilities in kafka induced by the use of log4j?  Can somebody
confirm?


> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> 

[jira] [Commented] (KAFKA-13534) Upgrade Log4j to 2.15.0 - CVE-2021-44228

2022-02-02 Thread Fabrice Bauzac-Stehly (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485868#comment-17485868
 ] 

Fabrice Bauzac-Stehly commented on KAFKA-13534:
---

Shouldn't this ticket be closed as duplicate of KAFKA-9366?

> Upgrade Log4j to 2.15.0 - CVE-2021-44228
> 
>
> Key: KAFKA-13534
> URL: https://issues.apache.org/jira/browse/KAFKA-13534
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 2.7.0, 2.8.0, 3.0.0
>Reporter: Sai Kiran Vudutala
>Priority: Major
>
> Log4j has an RCE vulnerability, see 
> [https://www.lunasec.io/docs/blog/log4j-zero-day/]
> References. 
> [https://github.com/advisories/GHSA-jfh8-c2jp-5v3q]
> [https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-02 Thread Ulrik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ulrik updated KAFKA-13638:
--
Description: 
I have a topology where I stream messages from an input topic, transform the 
message to multiple messages (via context.forward), and then store those 
messages in a KTable.

Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests 
take significantly longer time to run. 

 

I have attached a test class to demonstrate my scenario. When running this test 
with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
numbers:

 

*Version 2.8.1*
 * one input message and one output message: 541 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 919 ms

 

*Version 3.1.0*
 * one input message and one output message: 908 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 6 sec 94 ms

 

Even when the transformer just transforms and forwards one input message to one 
output message, the test takes approx. 400 ms longer to run.

When transforming 8 input messages to 240 output messages it takes approx 5 
seconds longer.

  was:
I have a topology where I stream messages from an input topic, transform the 
message to multiple messages (via context.forward), and then store those 
messages in a KTable.

Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests 
takes significantly longer time to run. 

 

I have attached a test class to demonstrate my scenario. When running this test 
with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
numbers:

 

*Version 2.8.1*
 * one input message and one output message: 541 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 919 ms

 

*Version 3.1.0*
 * one input message and one output message: 908 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 6 sec 94 ms

 

Even when the transformer just transforms and forwards one input message to one 
output message, the test takes approx. 400 ms longer to run.

When transforming 8 input messages to 240 output messages it takes approx 5 
seconds longer.


> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ijuma commented on pull request #11721: KAFKA-13629: use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-02 Thread GitBox


ijuma commented on pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#issuecomment-1027997296


   Thanks for the detailed analysis! Note that JMH 1.34 
(https://mail.openjdk.java.net/pipermail/jmh-dev/2021-December/003406.html) has 
support for much cheaper blackholes if executed with Java 17 or later. On that 
note, which Java version are you using in your experiments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jonathan-albrecht-ibm commented on pull request #11690: KAFKA-13599: Upgrade RocksDB to 6.27.3

2022-02-02 Thread GitBox


jonathan-albrecht-ibm commented on pull request #11690:
URL: https://github.com/apache/kafka/pull/11690#issuecomment-1027990865


   Thanks @cadonna!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11618: KAFKA-13558: NioEchoServer fails to close resources

2022-02-02 Thread GitBox


ijuma merged pull request #11618:
URL: https://github.com/apache/kafka/pull/11618


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11619: MINOR: allocate 2MB to offset map in connect EmbeddedKafkaCluster

2022-02-02 Thread GitBox


ijuma merged pull request #11619:
URL: https://github.com/apache/kafka/pull/11619


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #11518: MINOR: Upgrade to Gradle 7.3.3

2022-02-02 Thread GitBox


jlprat commented on pull request #11518:
URL: https://github.com/apache/kafka/pull/11518#issuecomment-1027974346


   Thanks @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11669: MINOR: Replace if/else with match in KafkaZkClient#getPartitionAssignmentForTopics

2022-02-02 Thread GitBox


ijuma merged pull request #11669:
URL: https://github.com/apache/kafka/pull/11669


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11717: KAFKA-13619: zookeeper.sync.time.ms is no longer used

2022-02-02 Thread GitBox


ijuma merged pull request #11717:
URL: https://github.com/apache/kafka/pull/11717


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11518: MINOR: Upgrade to Gradle 7.3.3

2022-02-02 Thread GitBox


ijuma merged pull request #11518:
URL: https://github.com/apache/kafka/pull/11518


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11712: WIP: Put failed tasks to end of processing list

2022-02-02 Thread GitBox


cadonna commented on a change in pull request #11712:
URL: https://github.com/apache/kafka/pull/11712#discussion_r797611602



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
 return readOnlyActiveTasks;
 }
 
+List orderedActiveTasks() {
+return Collections.unmodifiableList(orderedActiveTasks);
+}
+
+void moveActiveTasksToTailFor(final String topologyName) {
+final List tasksToMove = new LinkedList<>();
+final Iterator iterator = orderedActiveTasks.iterator();
+while (iterator.hasNext()) {
+final Task task = iterator.next();
+if (task.id().topologyName().equals(topologyName)) {
+iterator.remove();
+tasksToMove.add(task);
+}
+}
+orderedActiveTasks.addAll(tasksToMove);

Review comment:
   Are you proposing to commit tasks each time an exception occurs 
irrespectively of whether it is time to commit or not? Wouldn't it be simpler 
to commit when an exception happens instead of when the processing after an 
exception re-starts?   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-02 Thread Ulrik (Jira)
Ulrik created KAFKA-13638:
-

 Summary: Slow KTable update when forwarding multiple values from 
transformer
 Key: KAFKA-13638
 URL: https://issues.apache.org/jira/browse/KAFKA-13638
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.0.0, 3.1.0
Reporter: Ulrik
 Attachments: KafkaTest.java

I have a topology where I stream messages from an input topic, transform the 
message to multiple messages (via context.forward), and then store those 
messages in a KTable.

 

Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests 
takes significantly longer time to run. 

 

I have attached a test class to demonstrate my scenario. When running this test 
with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
numbers:

 

*Version 2.8.1*
 * one input message and one output message: 541 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 919 ms

 

*Version 3.1.0*
 * one input message and one output message: 908 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 6 sec 94 ms

 

Even when the transformer just transforms and forwards one input message to one 
output message, the test takes approx. 400 ms longer to run.

When transforming 8 input messages to 240 output messages it takes approx 5 
seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on a change in pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets

2022-02-02 Thread GitBox


dajac commented on a change in pull request #11726:
URL: https://github.com/apache/kafka/pull/11726#discussion_r797572591



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2948,6 +2948,64 @@ public void testAssignorNameConflict() {
 () -> new KafkaConsumer<>(configs, new StringDeserializer(), new 
StringDeserializer()));
 }
 
+@Test
+public void testOffsetsForTimesTimeout() {
+final KafkaConsumer consumer = 
consumerForCheckingTimeoutException();
+assertEquals(
+"Failed to get offsets by times in 6ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()

Review comment:
   Could we import `TimeoutException` instead of specifying the full 
qualified name every time?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2948,6 +2948,64 @@ public void testAssignorNameConflict() {
 () -> new KafkaConsumer<>(configs, new StringDeserializer(), new 
StringDeserializer()));
 }
 
+@Test
+public void testOffsetsForTimesTimeout() {
+final KafkaConsumer consumer = 
consumerForCheckingTimeoutException();
+assertEquals(
+"Failed to get offsets by times in 6ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
+);
+}
+
+@Test
+public void testBeginningOffsetsTimeout() {
+final KafkaConsumer consumer = 
consumerForCheckingTimeoutException();
+assertEquals(
+"Failed to get offsets by times in 6ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.beginningOffsets(singletonList(tp0))).getMessage()
+);
+}
+
+@Test
+public void testEndOffsetsTimeout() {
+final KafkaConsumer consumer = 
consumerForCheckingTimeoutException();
+assertEquals(
+"Failed to get offsets by times in 6ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.endOffsets(singletonList(tp0))).getMessage()
+);
+}
+
+private KafkaConsumer 
consumerForCheckingTimeoutException() {
+final Time time = new MockTime();
+SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+ConsumerMetadata metadata = createMetadata(subscription);
+MockClient client = new MockClient(time, metadata);
+
+initMetadata(client, singletonMap(topic, 1));
+Node node = metadata.fetch().nodes().get(0);
+
+ConsumerPartitionAssignor assignor = new RangeAssignor();
+
+final KafkaConsumer consumer = newConsumer(time, 
client, subscription, metadata, assignor, false, groupInstanceId);
+
+final ScheduledExecutorService exec = 
Executors.newSingleThreadScheduledExecutor();
+for (int i = 0; i < 10; i++) {
+// Prepare a retriable error periodically for the client to retry 
connection
+exec.schedule(
+() -> client.prepareResponseFrom(
+listOffsetsResponse(
+Collections.emptyMap(),
+Collections.singletonMap(tp0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+),
+node), 50L, TimeUnit.MILLISECONDS);
+// Sleep periodically to make loop retry timeout
+exec.schedule(() -> time.sleep(defaultApiTimeoutMs / 10), 50L, 
TimeUnit.MILLISECONDS);
+
+}

Review comment:
   I think that we could simplify this code and avoid using an executor by 
doing as follow:
   ```
   for (int i = 0; i < 10; i++) {
   client.prepareResponse(
   request -> {
   time.sleep(defaultApiTimeoutMs / 10);
   return request instanceof ListOffsetsRequest;
   },
   listOffsetsResponse(
   Collections.emptyMap(),
   Collections.singletonMap(tp0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
   ));
   }
   ```
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-02-02 Thread Ulrik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ulrik updated KAFKA-13638:
--
Description: 
I have a topology where I stream messages from an input topic, transform the 
message to multiple messages (via context.forward), and then store those 
messages in a KTable.

Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests 
takes significantly longer time to run. 

 

I have attached a test class to demonstrate my scenario. When running this test 
with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
numbers:

 

*Version 2.8.1*
 * one input message and one output message: 541 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 919 ms

 

*Version 3.1.0*
 * one input message and one output message: 908 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 6 sec 94 ms

 

Even when the transformer just transforms and forwards one input message to one 
output message, the test takes approx. 400 ms longer to run.

When transforming 8 input messages to 240 output messages it takes approx 5 
seconds longer.

  was:
I have a topology where I stream messages from an input topic, transform the 
message to multiple messages (via context.forward), and then store those 
messages in a KTable.

 

Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my tests 
takes significantly longer time to run. 

 

I have attached a test class to demonstrate my scenario. When running this test 
with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
numbers:

 

*Version 2.8.1*
 * one input message and one output message: 541 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 919 ms

 

*Version 3.1.0*
 * one input message and one output message: 908 ms
 * 8 input message and 30 output message per input message (240 output messages 
in total): 6 sec 94 ms

 

Even when the transformer just transforms and forwards one input message to one 
output message, the test takes approx. 400 ms longer to run.

When transforming 8 input messages to 240 output messages it takes approx 5 
seconds longer.


> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests takes significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dengziming commented on pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets

2022-02-02 Thread GitBox


dengziming commented on pull request #11726:
URL: https://github.com/apache/kafka/pull/11726#issuecomment-1027842174


   @dajac , Yes, I added a unit test for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2022-02-02 Thread GitBox


cadonna commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-1027802506


   @vamossagar12 Can we close this PR and the corresponding ticket since it 
seems we decided to look for a more complete solution? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13599) Upgrade RocksDB to 6.27.3

2022-02-02 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13599.
---
Resolution: Resolved

> Upgrade RocksDB to 6.27.3
> -
>
> Key: KAFKA-13599
> URL: https://issues.apache.org/jira/browse/KAFKA-13599
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Jonathan Albrecht
>Assignee: Jonathan Albrecht
>Priority: Major
> Attachments: compat_report.html
>
>
> RocksDB v6.27.3 has been released and it is the first release to support 
> s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle 
> without s390x support.
> RocksDB v6.27.3 has added some new options that require an update to 
> streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
>  but no other changes are needed to upgrade.
> A compatibility report is attached for the current version 6.22.1.1 -> 6.27.3



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] HyunSangHan commented on pull request #11418: MINOR: Write log differently according to the size of missingListenerPartitions

2022-02-02 Thread GitBox


HyunSangHan commented on pull request #11418:
URL: https://github.com/apache/kafka/pull/11418#issuecomment-1027773412


   Can someone look at this PR? :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #11690: KAFKA-13599: Upgrade RocksDB to 6.27.3

2022-02-02 Thread GitBox


cadonna merged pull request #11690:
URL: https://github.com/apache/kafka/pull/11690


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11690: KAFKA-13599: Upgrade RocksDB to 6.27.3

2022-02-02 Thread GitBox


cadonna commented on pull request #11690:
URL: https://github.com/apache/kafka/pull/11690#issuecomment-1027763171


   Failing test are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] 
Type=Raft, Name=testTopicPartition, Security=PLAINTEXT
   Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsExpirationTest.[2] 
quorum=kraft
   Build  / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1]  
Type=Raft, Name=testAllTopicPartition, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] 
Type=Raft, Name=testTopicPartition, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsTest.testAbortTransactionTimeout()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.KRaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   Build / JDK 17 and Scala 2.13 / 
kafka.network.SocketServerTest.remoteCloseWithIncompleteBufferedReceive()
   Build / JDK 8 and Scala 2.12 / 
kafka.network.SocketServerTest.remoteCloseWithIncompleteBufferedReceive()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Indupa commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-02 Thread GitBox


Indupa commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1027741356


   Thank you so much @dongjinleekr . Let me try to apply a patch and build.Will 
update you
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-02-02 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1027702595


   @Indupa I'm sorry. There was a mistake rebasing onto 2.8.1. You can see the 
updated patch with built tarball 
[here](https://github.com/dongjinleekr/kafka/releases/tag/2.8.1%2Blog4j2).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11726: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets

2022-02-02 Thread GitBox


dajac commented on pull request #11726:
URL: https://github.com/apache/kafka/pull/11726#issuecomment-1027677422


   Good catch! It seems that we forgot it in 
https://github.com/apache/kafka/commit/53ca52f855e903907378188d29224b3f9cefa6cb.
 Have you tried to add a unit test for the bug?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org