[GitHub] [kafka] sknot-rh commented on pull request #10436: KAFKA-12577; Remove deprecated `ConfigEntry` constructor for 3.0

2021-09-02 Thread GitBox


sknot-rh commented on pull request #10436:
URL: https://github.com/apache/kafka/pull/10436#issuecomment-91220


   I am using this deprecated ctor in my tests. How can I set, for example 
`source` option now? The other ctor has a package protected access.
   ```
   ConfigEntry(String name, String value, ConfigSource source, boolean 
isSensitive, boolean isReadOnly,
   List synonyms, ConfigType type, String 
documentation) 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version

2021-09-02 Thread Satyam Bala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17409248#comment-17409248
 ] 

Satyam Bala commented on KAFKA-13257:
-

Currently *{color:#de350b}Blocked{color}* to use Kafka-Streams (neither 2.8 nor 
3.0 ) on Alpine based images, until 3.0 release (when?).

> KafkaStreams Support For Latest RocksDB Version
> ---
>
> Key: KAFKA-13257
> URL: https://issues.apache.org/jira/browse/KAFKA-13257
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Alagukannan
>Priority: Major
> Attachments: hs_err_pid6.log
>
>
> Hi,
>  Can you please let us know if there is any plan for adding the latest 
> versions of rocksDB in kafka streams. If your planning it what's the timeline 
> we are looking at. If not planning to upgrade what's the reason behind it. Is 
> there any significant impact on upgrading like backward combability etc.. 
> Just to remind this general query to know about the rocksdb upgrade and its 
> impact on streams application.
> The main pain point behind asking this upgrade is, We tried to build an 
> application with kafka streams 2.8.0 on an alpine based OS and the docker 
> base image is as follows  
> azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless.  The streams 
> application worked fine until it had an interaction with state 
> store(rocksdb). The jvm crashed with the following error:
>  #
>  # A fatal error has been detected by the Java Runtime Environment:
>  #
>  # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207
>  #
>  # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) 
> (build 11.0.10+9-LTS)
>  # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed 
> mode, tiered, compressed oops, g1 gc, linux-amd64)
>  # Problematic frame:
>  # C [librocksdbjni15322693993163550519.so+0x271b27] 
> std::_Rb_tree, 
> std::less, std::allocator 
> >::_M_erase(std::_Rb_tree_node*)+0x27
> Then we found out rocksdb works well on glibc and not musl lib, where as 
> alpine supports musl lib alone for native dependencies. Further looking into 
> rocksdb for a solution we found that they have started supporting both glib 
> and musl native libs from 6.5.x versions.
>  But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is 
> the main reason behind asking for the rocksDB upgrade in kafka streams as 
> well.
> Have attached the PID log where JVM failures are happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ccding commented on pull request #11293: MINOR: defineInternal for KIP-405 configs

2021-09-02 Thread GitBox


ccding commented on pull request #11293:
URL: https://github.com/apache/kafka/pull/11293#issuecomment-912200659


   This PR fails `testCreateTopicsResponseMetadataAndConfig()`. DescribeTopic 
doesn't return the internal configs, while the response of CreateTopic includes 
the internal configs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-02 Thread GitBox


showuon commented on a change in pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#discussion_r701534858



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+@RunWith(Parameterized.class)
+public class WindowStoreFetchIntegrationTest {
+private enum StoreType { InMemory, RocksDB, Timed };
+private static final String STORE_NAME = "store";
+private static final int DATA_SIZE = 5;
+private static final long WINDOW_SIZE = 500L;
+private static final long RETENTION_MS = 1L;
+
+private StoreType storeType;
+private boolean enableLogging;
+private boolean enableCaching;
+private boolean forward;
+
+private LinkedList, Long>> expectedRecords;
+private LinkedList> records;
+private Properties streamsConfig;
+
+private TimeWindowedKStream windowedStream;
+
+public WindowStoreFetchIntegrationTest(final StoreType storeType, final 
boolean enableLogging, final boolean enableCaching, final boolean forward) {
+this.storeType = storeType;
+this.enableLogging = enableLogging;
+this.enableCaching = enableCaching;
+this.forward = forward;
+
+this.records = new LinkedList<>();
+this.expectedRecords = new LinkedList<>();
+final int m = DATA_SIZE / 2;
+for (int i = 0; i < DATA_SIZE; i++) {
+final String key = "key-" + i * 2;
+final String value = "val-" + i * 2;
+final KeyValue r = new KeyValue<>(key, value);
+records.add(r);
+records.add(r);
+// expected the count of each key is 2
+final long windowStartTime = i < m ? 0 : WINDOW_SIZE;
+expectedRecords.add(new KeyValue<>(new Windowed<>(key, new 
TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L));
+}
+}
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, 
enableCaching={2}, forward={3}")
+public static Collection data() {
+final List types = Arrays.asList(StoreType.InMemory, 
StoreType.RocksDB, StoreType.Timed);
+final List logging = Arrays.asList(true, false);
+final 

[GitHub] [kafka] showuon commented on pull request #11206: MINOR: Update streams doc to close KeyValueIterator in example code

2021-09-02 Thread GitBox


showuon commented on pull request #11206:
URL: https://github.com/apache/kafka/pull/11206#issuecomment-912194561


   @bbejeck , please take a look when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-09-02 Thread GitBox


showuon commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-912194029


   @dajac , I think this PR is good to get merged. WDYT?
   
   Failed tests are unrelated. Thanks.
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13270) Kafka may fail to connect to ZooKeeper, retry forever, and never start

2021-09-02 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-13270:
--
Description: The implementation of 
https://issues.apache.org/jira/browse/ZOOKEEPER-3593 in ZooKeeper version 3.6.0 
decreased the default value for the ZooKeeper client's `jute.maxbuffer` 
configuration from 4MB to 1MB.  This can cause a problem if Kafka tries to 
retrieve a large amount of data across many znodes -- in such a case the 
ZooKeeper client will repeatedly emit a message of the form 
"java.io.IOException: Packet len <> is out of range" and the Kafka broker 
will never connect to ZooKeeper and fail to make progress on the startup 
sequence.  We can avoid the potential for this issue to occur by explicitly 
setting the value to 4MB whenever we create a new ZooKeeper client as long as 
no explicit value has been set via the `jute.maxbuffer` system property.  (was: 
The implementation of https://issues.apache.org/jira/browse/ZOOKEEPER-3593 in 
ZooKeeper version 3.6.0 decreased the default value for the ZooKeeper client's 
`jute.maxbuffer` configuration from 4MB to 1MB.  This can cause a problem if 
Kafka tries to retrieve a large amount of data across many znodes -- in such a 
case the ZooKeeper client will repeatedly emit a message of the form 
"java.io.IOException: Packet len <> is out of range" and the Kafka broker 
will never connect to ZooKeeper and fail make progress on the startup sequence. 
 We can avoid the potential for this issue to occur by explicitly setting the 
value to 4MB whenever we create a new ZooKeeper client as long as no explicit 
value has been set via the `jute.maxbuffer` system property.)

> Kafka may fail to connect to ZooKeeper, retry forever, and never start
> --
>
> Key: KAFKA-13270
> URL: https://issues.apache.org/jira/browse/KAFKA-13270
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The implementation of https://issues.apache.org/jira/browse/ZOOKEEPER-3593 in 
> ZooKeeper version 3.6.0 decreased the default value for the ZooKeeper 
> client's `jute.maxbuffer` configuration from 4MB to 1MB.  This can cause a 
> problem if Kafka tries to retrieve a large amount of data across many znodes 
> -- in such a case the ZooKeeper client will repeatedly emit a message of the 
> form "java.io.IOException: Packet len <> is out of range" and the Kafka 
> broker will never connect to ZooKeeper and fail to make progress on the 
> startup sequence.  We can avoid the potential for this issue to occur by 
> explicitly setting the value to 4MB whenever we create a new ZooKeeper client 
> as long as no explicit value has been set via the `jute.maxbuffer` system 
> property.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13270) Kafka may fail to connect to ZooKeeper, retry forever, and never start

2021-09-02 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-13270:
-

 Summary: Kafka may fail to connect to ZooKeeper, retry forever, 
and never start
 Key: KAFKA-13270
 URL: https://issues.apache.org/jira/browse/KAFKA-13270
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0
Reporter: Ron Dagostino
Assignee: Ron Dagostino
 Fix For: 3.0.0


The implementation of https://issues.apache.org/jira/browse/ZOOKEEPER-3593 in 
ZooKeeper version 3.6.0 decreased the default value for the ZooKeeper client's 
`jute.maxbuffer` configuration from 4MB to 1MB.  This can cause a problem if 
Kafka tries to retrieve a large amount of data across many znodes -- in such a 
case the ZooKeeper client will repeatedly emit a message of the form 
"java.io.IOException: Packet len <> is out of range" and the Kafka broker 
will never connect to ZooKeeper and fail make progress on the startup sequence. 
 We can avoid the potential for this issue to occur by explicitly setting the 
value to 4MB whenever we create a new ZooKeeper client as long as no explicit 
value has been set via the `jute.maxbuffer` system property.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13269) Kafka Streams Aggregation data loss between instance restarts and rebalances

2021-09-02 Thread Rohit Bobade (Jira)
Rohit Bobade created KAFKA-13269:


 Summary: Kafka Streams Aggregation data loss between instance 
restarts and rebalances
 Key: KAFKA-13269
 URL: https://issues.apache.org/jira/browse/KAFKA-13269
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.2
Reporter: Rohit Bobade


Using Kafka Streams 2.6.2 and doing count based aggregation of messages. Also 
setting Processing Guarantee - EXACTLY_ONCE_BETA and 
NUM_STANDBY_REPLICAS_CONFIG = 1. Sending some messages and restarting instances 
in middle while processing to test fault tolerance. The output count is 
incorrect because of data loss while restoring state.

It looks like the streams task becomes active and starts processing even when 
the state is not fully restored but is within the acceptable recovery lag 
(default is 1) This results in data loss
{quote}A stateful active task is assigned to an instance only when its state is 
within the configured acceptable.recovery.lag, if one exists
{quote}
[https://docs.confluent.io/platform/current/streams/developer-guide/running-app.html?_ga=2.33073014.912824567.1630441414-1598368976.1615841473#state-restoration-during-workload-rebalance]

[https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_acceptable.recovery.lag]

Setting acceptable.recovery.lag to 0 and re-running the chaos tests gives the 
correct result.

Related KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-Computingthemost-caught-upinstances]

Just want to get some thoughts on this use case from the Kafka team or if 
anyone has encountered similar issue



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-02 Thread GitBox


hachikuji commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r701454478



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   val cleanableLogs = dirtyLogs.filter { ltc =>
 (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
   }
+
   if(cleanableLogs.isEmpty) {
-None
+val logsWithTombstonesExpired = dirtyLogs.filter {
+  case ltc => 
+// in this case, we are probably in a low throughput situation
+// therefore, we should take advantage of this fact and remove 
tombstones if we can
+// under the condition that the log's latest delete horizon is 
less than the current time
+// tracked
+ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && 
ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
   @junrao Yeah, that's an interesting idea. Do you think it would be 
possible to make it a size-based comparison?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #11288: MINOR: Fix error response generation

2021-09-02 Thread GitBox


hachikuji commented on pull request #11288:
URL: https://github.com/apache/kafka/pull/11288#issuecomment-912080811


   @mimaison Ah, I missed you already opened JIRAs. Never mind my comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11288: MINOR: Fix error response generation

2021-09-02 Thread GitBox


hachikuji commented on a change in pull request #11288:
URL: https://github.com/apache/kafka/pull/11288#discussion_r701442901



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -706,6 +706,8 @@ private void checkDescribeConfigsResponseVersions() {
 private void checkErrorResponse(AbstractRequest req, Throwable e, boolean 
checkEqualityAndHashCode) {
 AbstractResponse response = req.getErrorResponse(e);
 checkResponse(response, req.version(), checkEqualityAndHashCode);
+Map errorCounts = response.errorCounts();
+assertTrue(errorCounts.containsKey(Errors.forException(e)), "API Key " 
+ req.apiKey().name + "V" + req.version() + " failed errorCounts test");

Review comment:
   Is it possible to make this assertion stronger? Would we expect any 
other errors in the response?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding opened a new pull request #11293: MINOR: defineInternal for KIP-405 configs

2021-09-02 Thread GitBox


ccding opened a new pull request #11293:
URL: https://github.com/apache/kafka/pull/11293


   We haven't finished implementing KIP-405, therefore we should make
   KIP-405 configs as defineInternal.
   
   We may also want to port this change to 3.0 to avoid leaking these
   configs to the doc.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13225) Controller skips sending UpdateMetadataRequest when shutting down broker doesnt host partitions

2021-09-02 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13225.
-
Fix Version/s: 3.1.0
   Resolution: Fixed

merged the PR to trunk

> Controller skips sending UpdateMetadataRequest when shutting down broker 
> doesnt host partitions 
> 
>
> Key: KAFKA-13225
> URL: https://issues.apache.org/jira/browse/KAFKA-13225
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
> Fix For: 3.1.0
>
>
> If a broker not hosting replicas for any partitions is shut down while there 
> are offline partitions, the controller can fail to send out metadata updates 
> to other brokers in the cluster.
>  
> Since this is a very niche scenario, I will leave the priority as Minor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-09-02 Thread GitBox


junrao merged pull request #11255:
URL: https://github.com/apache/kafka/pull/11255


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-09-02 Thread GitBox


junrao commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r701405911



##
File path: 
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##
@@ -62,16 +63,17 @@
  * @param remoteLogSegmentMetadata metadata about the remote log segment.
  * @throws RemoteStorageException   if there are any storage related 
errors occurred.
  * @throws IllegalArgumentException if the given metadata instance does 
not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
+ * @return a Future which will complete once this operation is finished.
  */
-void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) throws RemoteStorageException;
+Future addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) throws RemoteStorageException;

Review comment:
   Should we use CompletableFuture instead of Future? With Future, there 
isn't much the caller could do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408977#comment-17408977
 ] 

John Roesler commented on KAFKA-13261:
--

Hi [~xnix] ,

 

Your suspicion is correct. TopologyTestDriver doesn't simulate partitions at 
all, so you won't be able to use it to test this case.

 

When it comes to a repro, you might be interested in this class, which verifies 
that foreign-key joins perform correctly when the input topics have different 
partitions: 
[https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java]

If we have a bug, my suspicion would be whether we're correctly capturing the 
partitioner that you're setting via Repartitioned. I'd suggest modifying that 
test to be closer to your example and seeing whether or not we still get the 
correct result.

 

On the subject of Repartitioned, I didn't quite follow why you're doing it. To 
be clear, when you're doing foreign-key joins, you do not need the two tables 
to have the same number of partitions, nor do you need them to be 
co-partitioned. This should work just fine:
{code:java}
  KTable tableB = builder.table("B",  
stringMaterialized("table.b"));

builder
.stream("A", Consumed.with(Serde.of(KeyA.class), 
Serde.of(EventA.class)))
.toTable(Named.as("table.a"), aMaterialized("table.a"))
.join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
Named.as("join.ab"), joinMaterialized("join.ab"))
.toStream()
.to("output", with(...));
{code}
Unless you have some other requirement for which you need the repartition 
operation, I'd suggest just completely dropping those repartition steps. At 
least, I'd suggest trying out removing them from the topology and verifying if 
you get the correct join results.

 

I hope this helps!

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-02 Thread GitBox


guozhangwang commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r701252846



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##
@@ -33,172 +39,112 @@
  * For key range queries, like fetch(key, fromTime, toTime), use the {@link 
RocksDBWindowStore}
  * which uses the {@link WindowKeySchema} to serialize the record bytes for 
efficient key queries.
  */
+@SuppressWarnings("unchecked")
 public class RocksDBTimeOrderedWindowStore

Review comment:
   Sounds good, will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-02 Thread GitBox


guozhangwang commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r701252642



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -146,7 +146,7 @@ public void process(final K key, final V1 value) {
 
 outerJoinWindowStore.ifPresent(store -> {
 // Delete the joined record from the non-joined outer 
window store
-store.put(KeyAndJoinSide.make(!isLeftSide, key), null, 
otherRecordTimestamp);
+store.put(KeyAndJoinSide.make(!isLeftSide, key, 
otherRecordTimestamp), null);

Review comment:
   Same as above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-02 Thread GitBox


guozhangwang commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r701252490



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -209,37 +208,36 @@ private void emitNonJoinedOuterRecords(final 
WindowStore, Left
 // reset to MAX_VALUE in case the store is empty
 sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-try (final KeyValueIterator>, 
LeftOrRightValue> it = store.all()) {
+try (final KeyValueIterator, LeftOrRightValue> 
it = store.all()) {
 while (it.hasNext()) {
-final KeyValue>, 
LeftOrRightValue> record = it.next();
+final KeyValue, LeftOrRightValue> record 
= it.next();
 
-final Windowed> windowedKey = record.key;
-final LeftOrRightValue value = record.value;
-sharedTimeTracker.minTime = windowedKey.window().start();
+final KeyAndJoinSide keyAndJoinSide = record.key;
+final LeftOrRightValue value = record.value;
+final K key = keyAndJoinSide.getKey();
+final long timestamp = keyAndJoinSide.getTimestamp();
+sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (windowedKey.window().start() + joinAfterMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
 break;
 }
 
-final K key = windowedKey.key().getKey();
-final long time = windowedKey.window().start();
-
 final R nullJoinedValue;
 if (isLeftSide) {
 nullJoinedValue = joiner.apply(key,
-(V1) value.getLeftValue(),
-(V2) value.getRightValue());
+value.getLeftValue(),
+value.getRightValue());
 } else {
 nullJoinedValue = joiner.apply(key,
-(V1) value.getRightValue(),
-(V2) value.getLeftValue());
+(V1) value.getRightValue(),
+(V2) value.getLeftValue());
 }
 
-context().forward(key, nullJoinedValue, 
To.all().withTimestamp(time));
+context().forward(key, nullJoinedValue, 
To.all().withTimestamp(timestamp));
 
 // Delete the key from the outer window store now it is 
emitted
-store.put(record.key.key(), null, 
record.key.window().start());
+store.put(keyAndJoinSide, null);

Review comment:
   The `delete` call would incur an additional `get` in order to return the 
deleted value which is not needed, so I intentionally used `put(k, null)` to 
avoid that extra get. I can leave a comment above explaining this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-02 Thread GitBox


guozhangwang commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r701251823



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -63,6 +63,10 @@
 private InternalProcessorContext context;
 private TaskId taskId;
 
+interface WindowedKeySerde {
+Bytes serialize(final Bytes key, final long timestamp, final int 
seqnum);
+}
+

Review comment:
   Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-02 Thread GitBox


guozhangwang commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r701251517



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideSerializer.java
##
@@ -55,9 +57,11 @@ public void configure(final Map configs, final 
boolean isKey) {
 public byte[] serialize(final String topic, final KeyAndJoinSide data) {
 final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0);
 final byte[] keyBytes = keySerializer.serialize(topic, data.getKey());
+final byte[] timestampBytes = timestampSerializer.serialize(topic, 
data.getTimestamp());
 
 return ByteBuffer
-.allocate(keyBytes.length + 1)
+.allocate(8 + keyBytes.length + 1)

Review comment:
   Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-02 Thread GitBox


guozhangwang commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r701251294



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##
@@ -48,15 +50,22 @@ public void configure(final Map configs, final 
boolean isKey) {
 
 @Override
 public KeyAndJoinSide deserialize(final String topic, final byte[] 
data) {
-final boolean bool = data[0] == 1;
+final boolean bool = data[8] == 1;

Review comment:
   Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning

2021-09-02 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13268:
-

 Summary: Add more integration tests for Table Table FK joins with 
repartitioning
 Key: KAFKA-13268
 URL: https://issues.apache.org/jira/browse/KAFKA-13268
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Guozhang Wang


We should add to the FK join multipartition integration test with a 
Repartitioned for:
1) just the new partition count
2) a custom partitioner

This is to test if there's a bug where the internal topics don't pick up a 
partitioner provided that way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408967#comment-17408967
 ] 

Guozhang Wang commented on KAFKA-13261:
---

Hello [~xnix] regarding "However, in a local docker environment we can 
reproduce the above scenario every time when using 4 partitions and the problem 
goes away when using 1 partition." Is that a test done through the 
TopologyTestDriver or is it a full-fledged integration test with Kafka clusters 
and Kafka Streams clients?



> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-02 Thread GitBox


junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r701240936



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##
@@ -37,6 +40,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;

Review comment:
   Sorry, I meant adding a description regarding tombstone in the comment 
of LogCleaner.

##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   val cleanableLogs = dirtyLogs.filter { ltc =>
 (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
   }
+
   if(cleanableLogs.isEmpty) {
-None
+val logsWithTombstonesExpired = dirtyLogs.filter {
+  case ltc => 
+// in this case, we are probably in a low throughput situation
+// therefore, we should take advantage of this fact and remove 
tombstones if we can
+// under the condition that the log's latest delete horizon is 
less than the current time
+// tracked
+ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && 
ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
   Related to this, I am a bit concerned about the extra cleaning due to 
this. If we have just one tombstone record, this can force a round of cleaning 
on idle partitions. An alternative way is to clean the number of total 
surviving records and tombstone records during cleaning. We only trigger a 
cleaning if #tombstone/#totalRecords > minCleanableRatio. @hachikuji What do 
you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-09-02 Thread GitBox


splett2 commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r701247464



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -628,9 +628,11 @@ class KafkaController(val config: KafkaConfig,
   topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
 }
 
-// If replica failure did not require leader re-election, inform brokers 
of the offline brokers
+// If no partition has changed leader or ISR, no UpdateMetadataRequest is 
sent through PartitionStateMachine
+// and ReplicaStateMachine. In that case, we want to send an 
UpdateMetadataRequest explicitly to
+// propagate the information about the new offline brokers.

Review comment:
   good point thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-09-02 Thread GitBox


jsancio commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r701234135



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -804,7 +817,7 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, 
List

[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-09-02 Thread GitBox


junrao commented on a change in pull request #11255:
URL: https://github.com/apache/kafka/pull/11255#discussion_r701230149



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -628,9 +628,11 @@ class KafkaController(val config: KafkaConfig,
   topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
 }
 
-// If replica failure did not require leader re-election, inform brokers 
of the offline brokers
+// If no partition has changed leader or ISR, no UpdateMetadataRequest is 
sent through PartitionStateMachine
+// and ReplicaStateMachine. In that case, we want to send an 
UpdateMetadataRequest explicitly to
+// propagate the information about the new offline brokers.

Review comment:
   With this comment, we could just remove the comment on the next line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13267) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-02 Thread Gilles Philippart (Jira)
Gilles Philippart created KAFKA-13267:
-

 Summary: InvalidPidMappingException: The producer attempted to use 
a producer id which is not currently assigned to its transactional id
 Key: KAFKA-13267
 URL: https://issues.apache.org/jira/browse/KAFKA-13267
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.0
Reporter: Gilles Philippart


We're using Confluent Cloud and Kafka Streams 2.8.0 and we've seen these errors 
pop up in apps using EOS:
{code:java}
InvalidPidMappingException: The producer attempted to use a producer id which 
is not currently assigned to its transactional id
{code}
Full stack trace:
{code:java}
Error encountered sending record to topic ola-update-1 for task 4_7 due to: 
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id. Exception handler choose to FAIL the processing, no more 
records would be sent.
RecordCollectorImpl.java  226  recordSendError(...)
RecordCollectorImpl.java:226:in `recordSendError'
RecordCollectorImpl.java  196  lambda$send$0(...)
RecordCollectorImpl.java:196:in `lambda$send$0'
KafkaProducer.java  1365  onCompletion(...)
KafkaProducer.java:1365:in `onCompletion'
ProducerBatch.java  231  completeFutureAndFireCallbacks(...)
ProducerBatch.java:231:in `completeFutureAndFireCallbacks'
ProducerBatch.java  159  abort(...)
ProducerBatch.java:159:in `abort'
RecordAccumulator.java  763  abortBatches(...)
RecordAccumulator.java:763:in `abortBatches'
More (5 lines)
Nested Exceptionsorg.apache.kafka.streams.errors.StreamsException: Error 
encountered sending record to topic ola-update-1 for task 4_7 due to: 
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id. Exception handler choose to FAIL the processing, no more 
records would be sent.
RecordCollectorImpl.java  226  recordSendError(...)
RecordCollectorImpl.java:226:in `recordSendError'
 org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
{code}
I've seen that KAFKA-6821 described the same problem on an earlier version of 
Kafka and was closed due to the subsequent works on EOS.

Another ticket raised recently shows that the exception is still occurring (but 
the ticket wasn't raised for that specific error): KAFKA-12774



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version

2021-09-02 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408917#comment-17408917
 ] 

Bruno Cadonna commented on KAFKA-13257:
---

We cannot upgrade the RocksDB version for 2.8 because Kafka Streams exposes a 
RocksDB API in the `RocksDBConfigSetter` that is not compatible with newer 
versions of RocksDB. We would break backward compatibility if we upgraded 
RocksDB in 2.8.

For 3.0, there has already been KAFKA-8897 that tracked the RocksDB upgrade.  

> KafkaStreams Support For Latest RocksDB Version
> ---
>
> Key: KAFKA-13257
> URL: https://issues.apache.org/jira/browse/KAFKA-13257
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Alagukannan
>Priority: Major
> Attachments: hs_err_pid6.log
>
>
> Hi,
>  Can you please let us know if there is any plan for adding the latest 
> versions of rocksDB in kafka streams. If your planning it what's the timeline 
> we are looking at. If not planning to upgrade what's the reason behind it. Is 
> there any significant impact on upgrading like backward combability etc.. 
> Just to remind this general query to know about the rocksdb upgrade and its 
> impact on streams application.
> The main pain point behind asking this upgrade is, We tried to build an 
> application with kafka streams 2.8.0 on an alpine based OS and the docker 
> base image is as follows  
> azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless.  The streams 
> application worked fine until it had an interaction with state 
> store(rocksdb). The jvm crashed with the following error:
>  #
>  # A fatal error has been detected by the Java Runtime Environment:
>  #
>  # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207
>  #
>  # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) 
> (build 11.0.10+9-LTS)
>  # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed 
> mode, tiered, compressed oops, g1 gc, linux-amd64)
>  # Problematic frame:
>  # C [librocksdbjni15322693993163550519.so+0x271b27] 
> std::_Rb_tree, 
> std::less, std::allocator 
> >::_M_erase(std::_Rb_tree_node*)+0x27
> Then we found out rocksdb works well on glibc and not musl lib, where as 
> alpine supports musl lib alone for native dependencies. Further looking into 
> rocksdb for a solution we found that they have started supporting both glib 
> and musl native libs from 6.5.x versions.
>  But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is 
> the main reason behind asking for the rocksDB upgrade in kafka streams as 
> well.
> Have attached the PID log where JVM failures are happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] spena commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-02 Thread GitBox


spena commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r701101588



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java
##
@@ -48,15 +50,22 @@ public void configure(final Map configs, final 
boolean isKey) {
 
 @Override
 public KeyAndJoinSide deserialize(final String topic, final byte[] 
data) {
-final boolean bool = data[0] == 1;
+final boolean bool = data[8] == 1;

Review comment:
   It should be good to add a constant for the `8` number, so it is easily 
read. for instance, the `rawKey()` has `new byte[data.length - 9]`, which I 
assume is `len - TIMESTAMP - BOOL`.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideSerializer.java
##
@@ -55,9 +57,11 @@ public void configure(final Map configs, final 
boolean isKey) {
 public byte[] serialize(final String topic, final KeyAndJoinSide data) {
 final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0);
 final byte[] keyBytes = keySerializer.serialize(topic, data.getKey());
+final byte[] timestampBytes = timestampSerializer.serialize(topic, 
data.getTimestamp());
 
 return ByteBuffer
-.allocate(keyBytes.length + 1)
+.allocate(8 + keyBytes.length + 1)

Review comment:
   Should the `8` be a constant variable or just `timestampBytes.length`? 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -63,6 +63,10 @@
 private InternalProcessorContext context;
 private TaskId taskId;
 
+interface WindowedKeySerde {
+Bytes serialize(final Bytes key, final long timestamp, final int 
seqnum);
+}
+

Review comment:
   Is this used somewhere?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##
@@ -33,172 +39,112 @@
  * For key range queries, like fetch(key, fromTime, toTime), use the {@link 
RocksDBWindowStore}
  * which uses the {@link WindowKeySchema} to serialize the record bytes for 
efficient key queries.
  */
+@SuppressWarnings("unchecked")
 public class RocksDBTimeOrderedWindowStore

Review comment:
   Two things:
   - Seems `TimeOrderedKeySchema` is not needed anymore. Should the class be 
removed?
   - Should we rename the class to remove the `Window` par?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -146,7 +146,7 @@ public void process(final K key, final V1 value) {
 
 outerJoinWindowStore.ifPresent(store -> {
 // Delete the joined record from the non-joined outer 
window store
-store.put(KeyAndJoinSide.make(!isLeftSide, key), null, 
otherRecordTimestamp);
+store.put(KeyAndJoinSide.make(!isLeftSide, key, 
otherRecordTimestamp), null);

Review comment:
   Should you call `store.delete(KeyAndJoinSide.make(!isLeftSide, key, 
otherRecordTimestamp))` now?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -209,37 +208,36 @@ private void emitNonJoinedOuterRecords(final 
WindowStore, Left
 // reset to MAX_VALUE in case the store is empty
 sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-try (final KeyValueIterator>, 
LeftOrRightValue> it = store.all()) {
+try (final KeyValueIterator, LeftOrRightValue> 
it = store.all()) {
 while (it.hasNext()) {
-final KeyValue>, 
LeftOrRightValue> record = it.next();
+final KeyValue, LeftOrRightValue> record 
= it.next();
 
-final Windowed> windowedKey = record.key;
-final LeftOrRightValue value = record.value;
-sharedTimeTracker.minTime = windowedKey.window().start();
+final KeyAndJoinSide keyAndJoinSide = record.key;
+final LeftOrRightValue value = record.value;
+final K key = keyAndJoinSide.getKey();
+final long timestamp = keyAndJoinSide.getTimestamp();
+sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (windowedKey.window().start() + joinAfterMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
 break;
 }
 
-final K key = windowedKey.key().getKey();
-final long time = windowedKey.window().start();
-
 final R nullJoinedValue;
 if (isLeftSide) {

[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread Tomas Forsman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786
 ] 

Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 2:34 PM:


Hi [~guozhang] and [~abellemare], thank you for your answers.

Yes, A and B use a partitioner that use the same value from respective topic 
key.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

We've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
Attached KafkaTest.java, but we've written several junit test cases with the 
TopologyTestDriver and different amount of test data but are unable to 
reproduce the problem. (does the test driver consider several partitions?)

However, in a local docker environment we can reproduce the above scenario 
every time when using 4 partitions and the problem goes away when using 1 
partition.

Below is when reading from the different topics directly for a specific id 
"ID123" 

Columns are 'offset, timestamp, partition | [key]value'

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat}
 


was (Author: xnix):
Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

We've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
Attached KafkaTest.java, but we've written several junit test cases with the 
TopologyTestDriver and different amount of test data but are unable to 
reproduce the problem. (does the test driver consider several partitions?)

However, in a local docker environment we can reproduce the above scenario 
every time when using 4 partitions and the problem goes away when using 1 
partition.

Below is when reading from the different topics directly for a specific id 
"ID123" 

Columns are 'offset, timestamp, partition | [key]value'

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 

[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-02 Thread GitBox


lkokhreidze commented on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-911752624


   Hi @cadonna,
   I've addressed/replied to your comments. Thanks for the feedback.
   FYI - I'll be offline from next week for 2 weeks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-02 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r701143273



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+numStandbyReplicas,
+allTaskIds
+);
+
+final Map> tagKeyToTagValues = new HashMap<>();
+final Map> tagValueToClients = new HashMap<>();
+
+fillClientsTagStatistics(clients, tagValueToClients, 
tagKeyToTagValues);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(
+numStandbyReplicas,
+taskId,
+clientId,
+rackAwareAssignmentTags,
+clients,
+tasksToRemainingStandbys,
+tagKeyToTagValues,
+tagValueToClients
+));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
tagValueToClients,
+ final Map> tagKeyToTagValues) {
+for (final Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID clientId = clientStateEntry.getKey();
+final ClientState clientState = clientStateEntry.getValue();
+
+clientState.clientTags().forEach((tagKey, tagValue) -> {
+tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new 
HashSet<>()).add(tagValue);
+

[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-02 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r701142976



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+numStandbyReplicas,
+allTaskIds
+);
+
+final Map> tagKeyToTagValues = new HashMap<>();
+final Map> tagValueToClients = new HashMap<>();
+
+fillClientsTagStatistics(clients, tagValueToClients, 
tagKeyToTagValues);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(
+numStandbyReplicas,
+taskId,
+clientId,
+rackAwareAssignmentTags,
+clients,
+tasksToRemainingStandbys,
+tagKeyToTagValues,
+tagValueToClients
+));
+
+return true;

Review comment:
   Pushed the changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-02 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r701141903



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+numStandbyReplicas,
+allTaskIds
+);
+
+final Map> tagKeyToTagValues = new HashMap<>();
+final Map> tagValueToClients = new HashMap<>();

Review comment:
   Sorry, can you elaborate more on this?
   Currently, when deciding the distribution, algorithm takes into account 
both, tag key, as well as tag value. So it will treat `key1: value2` and `key2: 
value2` as different dimensions. Do you think it's something that has to be 
addressed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-02 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r701137236



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+numStandbyReplicas,
+allTaskIds
+);
+
+final Map> tagKeyToTagValues = new HashMap<>();
+final Map> tagValueToClients = new HashMap<>();
+
+fillClientsTagStatistics(clients, tagValueToClients, 
tagKeyToTagValues);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(
+numStandbyReplicas,
+taskId,
+clientId,
+rackAwareAssignmentTags,
+clients,
+tasksToRemainingStandbys,
+tagKeyToTagValues,
+tagValueToClients
+));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
tagValueToClients,
+ final Map> tagKeyToTagValues) {
+for (final Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID clientId = clientStateEntry.getKey();
+final ClientState clientState = clientStateEntry.getValue();
+
+clientState.clientTags().forEach((tagKey, tagValue) -> {
+tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new 
HashSet<>()).add(tagValue);
+

[jira] [Updated] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-02 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13266:

Description: 
 `ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes 
fails with the following error in the log:
{noformat}
[2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
fetcherId=0] Unexpected error occurred while processing data for partition 
__consumer_offsets-1 at offset 31727 
(kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset 
mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end 
offset = 31728. at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
 at scala.Option.foreach(Option.scala:437) at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
 at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
 at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
 at scala.Option.foreach(Option.scala:437) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 
11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] 
Partition __consumer_offsets-1 marked as failed 
(kafka.server.ReplicaFetcherThread)
{noformat}
 The issue is due to a race condition in 
`ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created 
and populated before the partition is removed from the fetcher threads. This 
means that the fetch offset of the `InitialFetchState` could be outdated when 
the fetcher threads are re-started because the fetcher threads could have 
incremented the log end offset in between.

The partitions must be removed from the fetcher threads before the 
`InitialFetchStates` are created.

  was:
 

`ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes 
fails with the following error in the log:
{noformat}
[2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
fetcherId=0] Unexpected error occurred while processing data for partition 
__consumer_offsets-1 at offset 31727 
(kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset 
mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end 
offset = 31728. at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
 at scala.Option.foreach(Option.scala:437) at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
 at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
 at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
 at scala.Option.foreach(Option.scala:437) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 
11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] 
Partition __consumer_offsets-1 marked as failed 
(kafka.server.ReplicaFetcherThread)
{noformat}
 


[jira] [Created] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-02 Thread David Jacot (Jira)
David Jacot created KAFKA-13266:
---

 Summary: `InitialFetchState` should be created after partition is 
removed from the fetchers
 Key: KAFKA-13266
 URL: https://issues.apache.org/jira/browse/KAFKA-13266
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0
Reporter: David Jacot
Assignee: David Jacot


 

`ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes 
fails with the following error in the log:
{noformat}
[2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
fetcherId=0] Unexpected error occurred while processing data for partition 
__consumer_offsets-1 at offset 31727 
(kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset 
mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end 
offset = 31728. at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
 at scala.Option.foreach(Option.scala:437) at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
 at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
 at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
 at scala.Option.foreach(Option.scala:437) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 
11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] 
Partition __consumer_offsets-1 marked as failed 
(kafka.server.ReplicaFetcherThread)
{noformat}
 

The issue is due to a race condition in 
`ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created 
and populated before the partition is removed from the fetcher threads. This 
means that the fetch offset of the `InitialFetchState` could be outdated when 
the fetcher threads are re-started because the fetcher threads could have 
incremented the log end offset in between.

The partitions must be removed from the fetcher threads before the 
`InitialFetchStates` are created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version

2021-09-02 Thread Satyam Bala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408852#comment-17408852
 ] 

Satyam Bala commented on KAFKA-13257:
-

shall we open this ticket, until kafka-streams-2.8.0 fixed/upgraded with latest 
rocksdb or kafka-streams 3.0.0 released ?

> KafkaStreams Support For Latest RocksDB Version
> ---
>
> Key: KAFKA-13257
> URL: https://issues.apache.org/jira/browse/KAFKA-13257
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Alagukannan
>Priority: Major
> Attachments: hs_err_pid6.log
>
>
> Hi,
>  Can you please let us know if there is any plan for adding the latest 
> versions of rocksDB in kafka streams. If your planning it what's the timeline 
> we are looking at. If not planning to upgrade what's the reason behind it. Is 
> there any significant impact on upgrading like backward combability etc.. 
> Just to remind this general query to know about the rocksdb upgrade and its 
> impact on streams application.
> The main pain point behind asking this upgrade is, We tried to build an 
> application with kafka streams 2.8.0 on an alpine based OS and the docker 
> base image is as follows  
> azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless.  The streams 
> application worked fine until it had an interaction with state 
> store(rocksdb). The jvm crashed with the following error:
>  #
>  # A fatal error has been detected by the Java Runtime Environment:
>  #
>  # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207
>  #
>  # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) 
> (build 11.0.10+9-LTS)
>  # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed 
> mode, tiered, compressed oops, g1 gc, linux-amd64)
>  # Problematic frame:
>  # C [librocksdbjni15322693993163550519.so+0x271b27] 
> std::_Rb_tree, 
> std::less, std::allocator 
> >::_M_erase(std::_Rb_tree_node*)+0x27
> Then we found out rocksdb works well on glibc and not musl lib, where as 
> alpine supports musl lib alone for native dependencies. Further looking into 
> rocksdb for a solution we found that they have started supporting both glib 
> and musl native libs from 6.5.x versions.
>  But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is 
> the main reason behind asking for the rocksDB upgrade in kafka streams as 
> well.
> Have attached the PID log where JVM failures are happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread Tomas Forsman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786
 ] 

Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 1:25 PM:


Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

We've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
Attached KafkaTest.java, but we've written several junit test cases with the 
TopologyTestDriver and different amount of test data but are unable to 
reproduce the problem. (does the test driver consider several partitions?)

However, in a local docker environment we can reproduce the above scenario 
every time when using 4 partitions and the problem goes away when using 1 
partition.

Below is when reading from the different topics directly for a specific id 
"ID123" 

Columns are 'offset, timestamp, partition | [key]value'

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat}
 


was (Author: xnix):
Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

We've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
We have put up several junit test cases with the TopologyTestDriver but are 
unable to reproduce it. 

In a local docker environment we can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when reading from the different topics directly for a specific id 
"ID123" 

Columns are 'offset, timestamp, partition | [key]value'

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] 

[jira] [Updated] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread Tomas Forsman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomas Forsman updated KAFKA-13261:
--
Attachment: KafkaTest.java

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-02 Thread GitBox


cadonna commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r701068792



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+numStandbyReplicas,
+allTaskIds
+);
+
+final Map> tagKeyToTagValues = new HashMap<>();
+final Map> tagValueToClients = new HashMap<>();
+
+fillClientsTagStatistics(clients, tagValueToClients, 
tagKeyToTagValues);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(
+numStandbyReplicas,
+taskId,
+clientId,
+rackAwareAssignmentTags,
+clients,
+tasksToRemainingStandbys,
+tagKeyToTagValues,
+tagValueToClients
+));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
tagValueToClients,
+ final Map> tagKeyToTagValues) {
+for (final Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID clientId = clientStateEntry.getKey();
+final ClientState clientState = clientStateEntry.getValue();
+
+clientState.clientTags().forEach((tagKey, tagValue) -> {
+tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new 
HashSet<>()).add(tagValue);
+

[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-09-02 Thread GitBox


vamossagar12 commented on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-911654379


   > > @showuon , is there an existing integration test for Metered classes? I 
tried to find one to add the relevant tests but couldnt't find one..
   > 
   > Sorry for late reply. Metered store classes are for recording operation 
metrics, so you should have it when using any built-in state stores, ex: 
`Stores.windowStoreBuilder` will have `MeteredWindowStore`. Thanks.
   
   Thanks @showuon , I have found a bug with my. implementation. Will correct 
it and also add integration tests and then it could be reviewed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scholzj commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-09-02 Thread GitBox


scholzj commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-911647841


   Great, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread Tomas Forsman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786
 ] 

Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 12:50 PM:
-

Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

We've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
We have put up several junit test cases with the TopologyTestDriver but are 
unable to reproduce it. 

In a local docker environment we can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when reading from the different topics directly for a specific id 
"ID123" 

Columns are 'offset, timestamp, partition | [key]value'

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat}
 


was (Author: xnix):
Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

We've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
We have put up several junit test cases with the TopologyTestDriver but are 
unable to reproduce it. 

In a local docker environment we can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when reading from the different topics directly for a specific id 
"ID123" 

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 

Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 

[GitHub] [kafka] mimaison commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-09-02 Thread GitBox


mimaison commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-911644486


   @scholzj Yes that's the plan, I'll do it when merging into 3.0 reopens 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-02 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r701038718



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+numStandbyReplicas,
+allTaskIds
+);
+
+final Map> tagKeyToTagValues = new HashMap<>();
+final Map> tagValueToClients = new HashMap<>();
+
+fillClientsTagStatistics(clients, tagValueToClients, 
tagKeyToTagValues);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(
+numStandbyReplicas,
+taskId,
+clientId,
+rackAwareAssignmentTags,
+clients,
+tasksToRemainingStandbys,
+tagKeyToTagValues,
+tagValueToClients
+));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
tagValueToClients,
+ final Map> tagKeyToTagValues) {
+for (final Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID clientId = clientStateEntry.getKey();
+final ClientState clientState = clientStateEntry.getValue();
+
+clientState.clientTags().forEach((tagKey, tagValue) -> {
+tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new 
HashSet<>()).add(tagValue);
+

[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread Tomas Forsman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786
 ] 

Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 12:44 PM:
-

Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

We've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
We have put up several junit test cases with the TopologyTestDriver but are 
unable to reproduce it. 

In a local docker environment we can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when reading from the different topics directly for a specific id 
"ID123" 

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 

Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat}
 


was (Author: xnix):
Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

I've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
We have put up several junit test cases with the TopologyTestDriver but are 
unable to reproduce it. 

In a local docker environment we can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when reading from the different topics directly for a specific id 
"ID123" 

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 

Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] 

[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread Tomas Forsman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786
 ] 

Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 12:44 PM:
-

Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

I've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
We have put up several junit test cases with the TopologyTestDriver but are 
unable to reproduce it. 

In a local docker environment we can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when reading from the different topics directly for a specific id 
"ID123" 

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 

Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat}
 


was (Author: xnix):
Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

I've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
We have put up several junit test cases with the TopologyTestDriver but is 
unable to reproduce it. 

In a local docker environment I can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when I read from the different topics directly for a specific "ID123" 

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 

Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 

[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread Tomas Forsman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786
 ] 

Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 12:41 PM:
-

Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

I've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 
{noformat}
#Using 4 partitions
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
We have put up several junit test cases with the TopologyTestDriver but is 
unable to reproduce it. 

In a local docker environment I can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when I read from the different topics directly for a specific "ID123" 

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 

Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat}
 


was (Author: xnix):
Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

I've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 

 
{noformat}
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
 

 

We have put up several junit test cases with the TopologyTestDriver but is 
unable to reproduce it. 

In a local docker environment I can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when I read from the different topics for a specific "ID123" 

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 

Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | 

[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-02 Thread Tomas Forsman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786
 ] 

Tomas Forsman commented on KAFKA-13261:
---

Hi [~guozhang] and [~abellemare], thank you for your answers.

In the scenario we have the output topic is compacted on the same key as Topic 
A, so if intermediate events would not come - it would be fine. We want the 
final result. What we're seeing though is combinations missing completely. 

I've read out all events from the A,B and output topics but also the internal 
topics created by the join. Expected is that all would have 66 ids. As said, 
running with same data with one partition create a perfect match where all 
events has passed through. 

 
{noformat}
A                 : ids: 66
B                 : ids: 66
table.b-changelog : ids: 66
table.a-changelog : ids: 66
join.ab-changelog : ids: 20
output            : ids: 20{noformat}
 

 

We have put up several junit test cases with the TopologyTestDriver but is 
unable to reproduce it. 

In a local docker environment I can reproduce the above scenario every time 
when using 4 partitions and the problem goes away when using 1 partition.

Below is when I read from the different topics for a specific "ID123" 

Using 4 partitions
{noformat}
table.a-changelog 
5 1612435945196 0 | [ID123, 202101] A01 
15 1614863137136 0 | [ID123, 202102] A02 
25 1617882052260 0 | [ID123, 202103] A03 
35 1620299210336 0 | [ID123, 202104] A04 
45 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
6 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
output 
0 1622617868856 0 | [ID123, 202104] A04, BBB 
{noformat}
 

Using 1 partition
{noformat}
table.a-changelog 
28 1612435945196 0 | [ID123, 202101] A01 
88 1614863137136 0 | [ID123, 202102] A02 
149 1617882052260 0 | [ID123, 202103] A03 
210 1620299210336 0 | [ID123, 202104] A04 
269 1622804606823 0 | [ID123, 202105] A05 
table.b-changelog 
7 1622617868856 0 | [ID123] BBB 
join.ab-changelog 
28 1622617868856 0 | [ID123, 202101] A01, BBB 
88 1622617868856 0 | [ID123, 202102] A02, BBB 
149 1622617868856 0 | [ID123, 202103] A03, BBB 
210 1622617868856 0 | [ID123, 202104] A04, BBB 
269 1622804606823 0 | [ID123, 202105] A05, BBB 
output 
5 1622617868856 0 | [ID123, 202101] A01, BBB 
15 1622617868856 0 | [ID123, 202102] A02, BBB 
25 1622617868856 0 | [ID123, 202103] A03, BBB 
35 1622617868856 0 | [ID123, 202104] A04, BBB 
45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat}
 

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Priority: Major
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, 

[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-02 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r701038718



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+numStandbyReplicas,
+allTaskIds
+);
+
+final Map> tagKeyToTagValues = new HashMap<>();
+final Map> tagValueToClients = new HashMap<>();
+
+fillClientsTagStatistics(clients, tagValueToClients, 
tagKeyToTagValues);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(
+numStandbyReplicas,
+taskId,
+clientId,
+rackAwareAssignmentTags,
+clients,
+tasksToRemainingStandbys,
+tagKeyToTagValues,
+tagValueToClients
+));
+
+return true;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> 
tagValueToClients,
+ final Map> tagKeyToTagValues) {
+for (final Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID clientId = clientStateEntry.getKey();
+final ClientState clientState = clientStateEntry.getValue();
+
+clientState.clientTags().forEach((tagKey, tagValue) -> {
+tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new 
HashSet<>()).add(tagValue);
+

[GitHub] [kafka] showuon commented on pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore

2021-09-02 Thread GitBox


showuon commented on pull request #11227:
URL: https://github.com/apache/kafka/pull/11227#issuecomment-911629109


   Integration tests added, but found a bug that will fail these tests. Will 
wait for the PR got merged and continue this PR. Thanks. 
https://github.com/apache/kafka/pull/11292


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scholzj commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-09-02 Thread GitBox


scholzj commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-911607580


   @mimaison Will this also get into the 3.0.x release stream? I know it is 
probably late for 3.0.0 which already has RCs, so that is fine. But it would be 
nice to have it in 3.0.1 (if there ever is a 3.0.1 of course).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-02 Thread GitBox


vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-911592247


   @guozhangwang , these errors are not due to the changes in this PR:
   
   `imported `Named` is permanently hidden by definition of type Named in 
package kstream` 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version

2021-09-02 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408759#comment-17408759
 ] 

Bruno Cadonna commented on KAFKA-13257:
---

[~satyam.b...@gmail.com] Using a Kafka Streams version with a version of 
RocksDB different from the one in the Kafka dependencies ([here the 
dependencies for 
2.8|https://github.com/apache/kafka/blob/2.8/gradle/dependencies.gradle]) is 
not officially supported. It may or may not work. If you want to use a newer 
version of RocksDB, you need to wait until 3.0. 

> KafkaStreams Support For Latest RocksDB Version
> ---
>
> Key: KAFKA-13257
> URL: https://issues.apache.org/jira/browse/KAFKA-13257
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Alagukannan
>Priority: Major
> Attachments: hs_err_pid6.log
>
>
> Hi,
>  Can you please let us know if there is any plan for adding the latest 
> versions of rocksDB in kafka streams. If your planning it what's the timeline 
> we are looking at. If not planning to upgrade what's the reason behind it. Is 
> there any significant impact on upgrading like backward combability etc.. 
> Just to remind this general query to know about the rocksdb upgrade and its 
> impact on streams application.
> The main pain point behind asking this upgrade is, We tried to build an 
> application with kafka streams 2.8.0 on an alpine based OS and the docker 
> base image is as follows  
> azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless.  The streams 
> application worked fine until it had an interaction with state 
> store(rocksdb). The jvm crashed with the following error:
>  #
>  # A fatal error has been detected by the Java Runtime Environment:
>  #
>  # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207
>  #
>  # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) 
> (build 11.0.10+9-LTS)
>  # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed 
> mode, tiered, compressed oops, g1 gc, linux-amd64)
>  # Problematic frame:
>  # C [librocksdbjni15322693993163550519.so+0x271b27] 
> std::_Rb_tree, 
> std::less, std::allocator 
> >::_M_erase(std::_Rb_tree_node*)+0x27
> Then we found out rocksdb works well on glibc and not musl lib, where as 
> alpine supports musl lib alone for native dependencies. Further looking into 
> rocksdb for a solution we found that they have started supporting both glib 
> and musl native libs from 6.5.x versions.
>  But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is 
> the main reason behind asking for the rocksDB upgrade in kafka streams as 
> well.
> Have attached the PID log where JVM failures are happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13265) Kafka consumers disappearing after certain point of time

2021-09-02 Thread Ayyandurai Mani (Jira)
Ayyandurai Mani created KAFKA-13265:
---

 Summary: Kafka consumers disappearing after certain point of time 
 Key: KAFKA-13265
 URL: https://issues.apache.org/jira/browse/KAFKA-13265
 Project: Kafka
  Issue Type: Test
  Components: consumer
Affects Versions: 2.4.0
Reporter: Ayyandurai Mani
 Attachments: Consumer_Disappear_Issue_Screen.png, server.log

Dear Kafka Team,

We are facing one issue for past few days in our development environment. We 
have topic called 'search-service-topic-dev' and consumer group 
'search-service-group' with 10 partitions, and concurrency also 10 at  consumer 
side. 

When we publish more messages( each message is 115kb) into the topic after some 
certain point of the time consumers disappeared from the consumer group (note : 
consumer service are running). Have attached screenshot for reference (filename 
: Consumer_Disappear_Issue_Screen.png) 

>From screenshot when i execute describe command for the consumer group at 
>14:35:32 (IST) consumers were available but when i execute at 14:38:17(IST) 
>consumers were not there. 

Attached kafka server.log for that particular time(kafka is running in UTC 
timezone server).

Note : Message size in each partitions is around 2GB.

 

We are kind of blocked due to this behavior. Please help me to resolve this. 

Thanks in advance.

Ayyandurai

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ashishpatil09 commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2021-09-02 Thread GitBox


ashishpatil09 commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-911500667


   Hi Guys
   Is there any plan to release this fix soon?
   Thanks
   Ashish


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-09-02 Thread GitBox


cadonna commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r700864299



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+final Map statefulTasksWithClients = new HashMap<>();
+
+statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, 
clientState) -> {
+if (clientState.activeTasks().contains(statefulTaskId)) {
+statefulTasksWithClients.put(statefulTaskId, uuid);
+}
+}));
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+numStandbyReplicas,
+allTaskIds
+);
+
+final Map> tagKeyToTagValues = new HashMap<>();
+final Map> tagValueToClients = new HashMap<>();
+
+fillClientsTagStatistics(clients, tagValueToClients, 
tagKeyToTagValues);
+
+statefulTasksWithClients.forEach((taskId, clientId) -> 
assignStandbyTasksForActiveTask(
+numStandbyReplicas,
+taskId,
+clientId,
+rackAwareAssignmentTags,
+clients,
+tasksToRemainingStandbys,
+tagKeyToTagValues,
+tagValueToClients
+));
+
+return true;

Review comment:
   Although we never use the returned value from a standby task assignor, I 
would return `false` since a standby task assignment will never require a 
follow-up probing rebalance.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

[GitHub] [kafka] showuon commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-02 Thread GitBox


showuon commented on pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#issuecomment-911448780


   @jeqo @ableegoldman @guozhangwang , please help review this PR. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-02 Thread GitBox


showuon commented on a change in pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#discussion_r700910107



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
##
@@ -499,10 +499,14 @@ public void close() {
 final Map.Entry> 
currentSegment = segmentIterator.next();
 currentTime = currentSegment.getKey();
 
-if (allKeys) {
-return currentSegment.getValue().entrySet().iterator();
+final ConcurrentNavigableMap subMap = allKeys ?
+currentSegment.getValue() :
+currentSegment.getValue().subMap(keyFrom, true, keyTo, true);
+
+if (forward) {
+return subMap.entrySet().iterator();
 } else {
-return currentSegment.getValue().subMap(keyFrom, true, keyTo, 
true).entrySet().iterator();
+return subMap.descendingMap().entrySet().iterator();

Review comment:
   Before this change, when setting records iterator, we only consider the 
`allKey` case, not the `forward/backward` cases. Fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-02 Thread GitBox


showuon commented on a change in pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#discussion_r700908241



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##
@@ -1176,6 +1105,7 @@ private void putFirstBatch(final WindowStore store,
 store.put(0, "zero", startTime);
 store.put(1, "one", startTime + 1L);
 store.put(2, "two", startTime + 2L);
+store.put(3, "three", startTime + 2L);

Review comment:
   add 2 records at the same timestamp to test the forward and backward 
fetch cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dadufour commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-09-02 Thread GitBox


dadufour commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-911390634


   Thanks for the feedback


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-09-02 Thread GitBox


mimaison commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-911379969


   @akatona84 Thanks for the fix and sorry again for the delay merging it.
   
   @dadufour At this point, it's unlikely there will be a 2.7.2 release. I've 
backported it to 2.8 so it will be in the 2.8.1 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9747) No tasks created for a connector

2021-09-02 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-9747:
--
Fix Version/s: 2.8.1

> No tasks created for a connector
> 
>
> Key: KAFKA-9747
> URL: https://issues.apache.org/jira/browse/KAFKA-9747
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
> Environment: OS: Ubuntu 18.04 LTS
> Platform: Confluent Platform 5.4
> HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge
>Reporter: Vit Koma
>Assignee: Andras Katona
>Priority: Major
> Fix For: 3.1.0, 2.8.1
>
> Attachments: connect-distributed.properties, connect.log
>
>
> We are running Kafka Connect in a distributed mode on 3 nodes using Debezium 
> (MongoDB) and Confluent S3 connectors. When adding a new connector via the 
> REST API the connector is created in RUNNING state, but no tasks are created 
> for the connector.
> Pausing and resuming the connector does not help. When we stop all workers 
> and then start them again, the tasks are created and everything runs as it 
> should.
> The issue does not show up if we run only a single node.
> The issue is not caused by the connector plugins, because we see the same 
> behaviour for both Debezium and S3 connectors. Also in debug logs I can see 
> that Debezium is correctly returning a task configuration from the 
> Connector.taskConfigs() method.
> Connector configuration examples
> Debezium:
> {code}
> {
>   "name": "qa-mongodb-comp-converter-task|1",
>   "config": {
> "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
> "mongodb.hosts": 
> "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
> "mongodb.name": "qa-debezium-comp",
> "mongodb.ssl.enabled": true,
> "collection.whitelist": "converter[.]task",
> "tombstones.on.delete": true
>   }
> }
> {code}
> S3 Connector:
> {code}
> {
>   "name": "qa-s3-sink-task|1",
>   "config": {
> "connector.class": "io.confluent.connect.s3.S3SinkConnector",
> "topics": "qa-debezium-comp.converter.task",
> "topics.dir": "data/env/qa",
> "s3.region": "eu-west-1",
> "s3.bucket.name": "",
> "flush.size": "15000",
> "rotate.interval.ms": "360",
> "storage.class": "io.confluent.connect.s3.storage.S3Storage",
> "format.class": 
> "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
> "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.DefaultPartitioner",
> "schema.compatibility": "NONE",
> "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> "key.converter.schemas.enable": false,
> "value.converter.schemas.enable": false,
> "transforms": "ExtractDocument",
> 
> "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
>   }
> }
> {code}
> The connectors are created using curl: {{curl -X POST -H "Content-Type: 
> application/json" --data @ http:/:10083/connectors}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2021-09-02 Thread Ashish Patil (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408666#comment-17408666
 ] 

Ashish Patil commented on KAFKA-9366:
-

Hi Guys

Is there any plan to fix this issue soon?

Thanks

Ashish

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon opened a new pull request #11292: [WIP] KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-02 Thread GitBox


showuon opened a new pull request #11292:
URL: https://github.com/apache/kafka/pull/11292


   We forgot to make each segment in reverse order (i.e. in `descendingMap`) in 
`InMemoryWindowStore`. Fix it and add integration tests for it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-09-02 Thread GitBox


mimaison merged pull request #11174:
URL: https://github.com/apache/kafka/pull/11174


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13264) backwardFetch in InMemoryWindowStore doesn't return in reverse order

2021-09-02 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13264:
-

 Summary: backwardFetch in InMemoryWindowStore doesn't return in 
reverse order
 Key: KAFKA-13264
 URL: https://issues.apache.org/jira/browse/KAFKA-13264
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
Reporter: Luke Chen
Assignee: Luke Chen


When working on another PR, I found currently, the backwardFetch in 
InMemoryWindowStore doesn't return in reverse order when there are records in 
the same window.

ex: window size = 500,

input records:

key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500\] window

key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500\] window

when fetch in forward order:

"a" -> "b", which is expected

when fetch in backward order:

"a" -> "b", which is NOT expected



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-09-02 Thread GitBox


satishd commented on pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#issuecomment-911240472


   @junrao gentle reminder to review the changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org