[GitHub] [kafka] chia7712 commented on a change in pull request #10976: KAFKA-13036 Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest

2021-09-15 Thread GitBox


chia7712 commented on a change in pull request #10976:
URL: https://github.com/apache/kafka/pull/10976#discussion_r709761794



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
##
@@ -300,65 +287,47 @@ public void 
shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfte
 recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
 reset(recordingTrigger);
 recordingTrigger.addMetricsRecorder(recorder);
-replay(recordingTrigger);
 
 recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
-
-verify(recordingTrigger);
 }
 
 @Test
 public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 reset(recordingTrigger);
-replay(recordingTrigger);
 
 recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
-
-verify(recordingTrigger);
 }
 
 @Test
 public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 reset(statisticsToAdd1);
 statisticsToAdd1.close();
-replay(statisticsToAdd1);
 
 recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-
-verify(statisticsToAdd1);
 }
 
 @Test
 public void 
shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, null);
 reset(statisticsToAdd1);
-replay(statisticsToAdd1);
 
 recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-
-verify(statisticsToAdd1);
 }
 
 @Test
 public void 
shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() {
 recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
 recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
 reset(recordingTrigger);
-replay(recordingTrigger);
 
 recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
 
-verify(recordingTrigger);
-
 reset(recordingTrigger);
 recordingTrigger.removeMetricsRecorder(recorder);
-replay(recordingTrigger);
 
 recorder.removeValueProviders(SEGMENT_STORE_NAME_2);

Review comment:
   for example:
   
   ```java
   recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
   recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
   recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
   recorder.removeValueProviders(SEGMENT_STORE_NAME_2);
   Mockito.verify(recordingTrigger, 
Mockito.times(1)).removeMetricsRecorder(recorder);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yangdaixai commented on pull request #11329: KAFKA-13301; Optimized the interpretation of the relationship between 'request.timeout. ms' and 'max.poll.interval.ms' in the document.

2021-09-15 Thread GitBox


yangdaixai commented on pull request #11329:
URL: https://github.com/apache/kafka/pull/11329#issuecomment-920539185


   @guozhangwang 
   
   issues:
   
https://issues.apache.org/jira/browse/KAFKA-13301?focusedCommentId=17415801=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17415801


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-15 Thread yangshengwei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415840#comment-17415840
 ] 

yangshengwei commented on KAFKA-13301:
--

 

[~guozhang]

 submit a PR: 

https://github.com/apache/kafka/pull/11329

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Attachments: image-2021-09-15-15-37-25-561.png, 
> image-2021-09-15-15-39-00-179.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] yangdaixai commented on pull request #11329: KAFKA-13301; Optimized the interpretation of the relationship between 'request.timeout. ms' and 'max.poll.interval.ms' in the document.

2021-09-15 Thread GitBox


yangdaixai commented on pull request #11329:
URL: https://github.com/apache/kafka/pull/11329#issuecomment-920537877


   @guozhangwang 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yangdaixai opened a new pull request #11329: Ysw0916

2021-09-15 Thread GitBox


yangdaixai opened a new pull request #11329:
URL: https://github.com/apache/kafka/pull/11329


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-09-15 Thread Anamika Nadkarni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415812#comment-17415812
 ] 

Anamika Nadkarni commented on KAFKA-13255:
--

[~ryannedolan] Submitted PR. Thank you !!

> Mirrormaker config property config.properties.exclude is not working as 
> expected 
> -
>
> Key: KAFKA-13255
> URL: https://issues.apache.org/jira/browse/KAFKA-13255
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Anamika Nadkarni
>Priority: Major
>
> Objective - Use MM2 (kafka connect in distributed cluster) for data migration 
> between cluster hosted in private data center and aws msk cluster.
> Steps performed -
>  # Started kafka-connect service.
>  # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
> heartbeat connector). Curl commands used to create connectors are in the 
> attached file.  To exclude certain config properties while topic replication, 
> we are using the 'config.properties.exclude' property in the MM2 source 
> connector.
> Expected -
> Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created 
> in destination cluster.
> Actual -
> Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination 
> cluster fails with an error. Error is
> {code:java}
> [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
> dev.portlandDc.anamika.helloMsk. 
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
> org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
> config name: confluent.value.schema.validation{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] AnamikaN opened a new pull request #11328: Fixed code so user can use config.properties.exclude to exclude prope…

2021-09-15 Thread GitBox


AnamikaN opened a new pull request #11328:
URL: https://github.com/apache/kafka/pull/11328


   Objective - Use MM2 (kafka connect in distributed cluster) for data 
migration between cluster hosted in private data center and aws msk cluster.
   
   Steps performed -
   
   Started kafka-connect service.
   Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
heartbeat connector). Curl commands used to create connectors are in the 
attached file.  To exclude certain config properties while topic replication, 
we are using the 'config.properties.exclude' property in the MM2 source 
connector.
   Expected -
   
   Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully 
created in destination cluster.
   
   Actual -
   
   Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in 
destination cluster fails with an error. Error is
   
   [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
dev.portlandDc.anamika.helloMsk. 
(org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
   org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
config name: confluent.value.schema.validation
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker

2021-09-15 Thread GitBox


junrao commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r707813690



##
File path: 
storage/src/main/resources/message/RemoteLogSegmentMetadataRecordSnapshot.json
##
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 0,
+  "type": "data",
+  "name": "RemoteLogSegmentMetadataRecordSnapshot",
+  "validVersions": "0",
+  "flexibleVersions": "none",

Review comment:
   Should we support flexible version from the beginning so that we could 
potentially support downgrade during future format changes?

##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+// header: 
+// size: 2 + (8+8) + 4 + 8 = 30
+private static final int HEADER_SIZE = 30;
+
+private final File metadataStoreFile;
+
+/**
+ * Creates a CommittedLogMetadataSnapshotFile instance backed by a file 
with the name `remote_log_snapshot` in
+ * the given {@code metadataStoreDir}. It creates the file if it does not 
exist.
+ *
+ * @param metadataStoreDir directory in which the snapshot file to be 
created.
+ */
+RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+this.metadataStoreFile = new File(metadataStoreDir.toFile(), 
COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+// Create an empty file if it does not exist.
+try {
+boolean newFileCreated = metadataStoreFile.createNewFile();
+log.info("Remote log metadata snapshot file: [{}], newFileCreated: 
[{}]", metadataStoreFile, newFileCreated);
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+}
+
+/**
+ * Writes the given snapshot replacing the earlier snapshot data.
+ *
+ * @param snapshot Snapshot to be 

[jira] [Commented] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-15 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415801#comment-17415801
 ] 

Guozhang Wang commented on KAFKA-13301:
---

Thanks for filing this [~yangshengwei]. I think it should be "The value of the 
configuration `request.timeout.ms` must always be smaller than 
`max.poll.interval.ms`, so we have changed `max.poll.interval.ms`'s default 
value to just above 5 minutes."

Would you like to submit a PR?

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Attachments: image-2021-09-15-15-37-25-561.png, 
> image-2021-09-15-15-39-00-179.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12226) High-throughput source tasks fail to commit offsets

2021-09-15 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12226:
--
Description: 
The current source task thread has the following workflow:
 # Poll messages from the source task
 # Queue these messages to the producer and send them to Kafka asynchronously.
 # Add the message to outstandingMessages, or if a flush is currently active, 
outstandingMessagesBacklog
 # When the producer completes the send of a record, remove it from 
outstandingMessages

The commit offsets thread has the following workflow:
 # Wait a flat timeout for outstandingMessages to flush completely
 # If this times out, add all of the outstandingMessagesBacklog to the 
outstandingMessages and reset
 # If it succeeds, commit the source task offsets to the backing store.
 # Retry the above on a fixed schedule

If the source task is producing records quickly (faster than the producer can 
send), then the producer will throttle the task thread by blocking in its 
{{send}} method, waiting at most {{max.block.ms}} for space in the 
{{buffer.memory}} to be available. This means that the number of records in 
{{outstandingMessages}} + {{outstandingMessagesBacklog}} is proportional to the 
size of the producer memory buffer.

This amount of data might take more than {{offset.flush.timeout.ms}} to flush, 
and thus the flush will never succeed while the source task is rate-limited by 
the producer memory. This means that we may write multiple hours of data to 
Kafka and not ever commit source offsets for the connector. When the task is 
lost due to a worker failure, hours of data will be re-processed that otherwise 
were successfully written to Kafka.

  was:
The current source task thread has the following workflow:
 # Poll messages from the source task

 # Queue these messages to the producer and send them to Kafka asynchronously.

 # Add the message to outstandingMessages, or if a flush is currently active, 
outstandingMessagesBacklog

 # When the producer completes the send of a record, remove it from 
outstandingMessages

The commit offsets thread has the following workflow:
 # Wait a flat timeout for outstandingMessages to flush completely

 # If this times out, add all of the outstandingMessagesBacklog to the 
outstandingMessages and reset

 # If it succeeds, commit the source task offsets to the backing store.

 # Retry the above on a fixed schedule

If the source task is producing records quickly (faster than the producer can 
send), then the producer will throttle the task thread by blocking in its 
{{send}} method, waiting at most {{max.block.ms}} for space in the 
{{buffer.memory}} to be available. This means that the number of records in 
{{outstandingMessages}} + {{outstandingMessagesBacklog}} is proportional to the 
size of the producer memory buffer.

This amount of data might take more than {{offset.flush.timeout.ms}} to flush, 
and thus the flush will never succeed while the source task is rate-limited by 
the producer memory. This means that we may write multiple hours of data to 
Kafka and not ever commit source offsets for the connector. When the task is 
lost due to a worker failure, hours of data will be re-processed that otherwise 
were successfully written to Kafka.


> High-throughput source tasks fail to commit offsets
> ---
>
> Key: KAFKA-12226
> URL: https://issues.apache.org/jira/browse/KAFKA-12226
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> The current source task thread has the following workflow:
>  # Poll messages from the source task
>  # Queue these messages to the producer and send them to Kafka asynchronously.
>  # Add the message to outstandingMessages, or if a flush is currently active, 
> outstandingMessagesBacklog
>  # When the producer completes the send of a record, remove it from 
> outstandingMessages
> The commit offsets thread has the following workflow:
>  # Wait a flat timeout for outstandingMessages to flush completely
>  # If this times out, add all of the outstandingMessagesBacklog to the 
> outstandingMessages and reset
>  # If it succeeds, commit the source task offsets to the backing store.
>  # Retry the above on a fixed schedule
> If the source task is producing records quickly (faster than the producer can 
> send), then the producer will throttle the task thread by blocking in its 
> {{send}} method, waiting at most {{max.block.ms}} for space in the 
> {{buffer.memory}} to be available. This means that the number of records in 
> {{outstandingMessages}} + {{outstandingMessagesBacklog}} is proportional to 
> the size of the producer memory buffer.
> This amount of data might take more than {{offset.flush.timeout.ms}} to 
> 

[GitHub] [kafka] guozhangwang commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

2021-09-15 Thread GitBox


guozhangwang commented on pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#issuecomment-920445983


   > I think that should work. now we have both an upper bound on total memory 
and a minimum guarantee
   
   Yeah, I feel more comfortable for this proposal :) Basically I think instead 
of defining the config as an "override" on the per-application config, we 
should just consider having a separate config on the per-topology level (e.g. 
your proposed one based on percentage of the total per-application). In that 
way user's can specify clearly what they want, and get exactly what they 
specified.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-15 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415792#comment-17415792
 ] 

Victoria Xia commented on KAFKA-13261:
--

Thanks for the confirmation, Matthias and Guozhang! I've opened a small KIP 
with the proposed interface changes to allow users to pass in custom 
partitioners for FK joins: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]

I tried sending an email to the 
[d...@kafka.apache.org|mailto:d...@kafka.apache.org] mailing list to start 
discussion but either it's taking a while to send or my permissions aren't yet 
set up correctly. Hopefully it will be ready for discussion soon.

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-09-15 Thread Andrew patterson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415755#comment-17415755
 ] 

Andrew patterson commented on KAFKA-12994:
--

Think it's fine so far, will move onto the next part when 
[https://github.com/apache/kafka/pull/11215] is merged.

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Andrew patterson
>Priority: Major
>  Labels: kip-633, newbie, newbie++
> Fix For: 3.1.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-15 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415747#comment-17415747
 ] 

Guozhang Wang commented on KAFKA-13261:
---

> wouldn't we still have an analogous bug if either of the topics for the 
> source tables had custom partitioning logic created from outside Streams 
> (i.e., without a repartition() step in the Streams topology)? In this case, 
> Streams has no way of determining the partitioning of the source tables, 
> which means we need an update to the interface for foreign key joins so that 
> users can specify a partitioner to use in order to ensure copartitioning of 
> the subscription and response topics with the relevant tables. Is this 
> reasoning sound?

Yeah I think that's faire; KS assumes the source topics are partitioned by key, 
but does not require it has to be partitioned with default mechanism. However 
when getting back to the source tables from the subscription table it simply 
assumes default partitioning is used. For that, I agree allowing users to pass 
in the partitioner in FK would be good, so that if users know the source tables 
are not partitioned with the default partitioner, they should be responsible 
for passing that custom partitioner in FK.

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-15 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r709539669



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -493,19 +496,20 @@ private[log] class Cleaner(val id: Int,
* @return The first offset not cleaned and the statistics for this round of 
cleaning
*/
   private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
+doClean(cleanable, time.milliseconds())
+  }
+
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, 
CleanerStats) = {
+info("Beginning cleaning of log %s".format(cleanable.log.name))
+
 // figure out the timestamp below which it is safe to remove delete 
tombstones
 // this position is defined to be a configurable time beneath the last 
modified time of the last clean segment
-val deleteHorizonMs =
+// this timestamp is only used on the older message formats newer than 
MAGIC_VALUE_V2

Review comment:
   ah thanks for the catch :/ I've been getting mixed up in my head




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-15 Thread GitBox


junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r709536010



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -493,19 +496,20 @@ private[log] class Cleaner(val id: Int,
* @return The first offset not cleaned and the statistics for this round of 
cleaning
*/
   private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
+doClean(cleanable, time.milliseconds())
+  }
+
+  private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, 
CleanerStats) = {
+info("Beginning cleaning of log %s".format(cleanable.log.name))
+
 // figure out the timestamp below which it is safe to remove delete 
tombstones
 // this position is defined to be a configurable time beneath the last 
modified time of the last clean segment
-val deleteHorizonMs =
+// this timestamp is only used on the older message formats newer than 
MAGIC_VALUE_V2

Review comment:
   newer => older ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-15 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r709510271



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -658,20 +673,22 @@ private[log] class Cleaner(val id: Int,
   }
 }
 
-if (batch.hasProducerId && isBatchLastRecordOfProducer)
-  BatchRetention.RETAIN_EMPTY
-else if (discardBatchRecords)
-  BatchRetention.DELETE
-else
-  BatchRetention.DELETE_EMPTY
+val batchRetention: BatchRetention =
+  if (batch.hasProducerId && isBatchLastRecordOfProducer)
+BatchRetention.RETAIN_EMPTY
+  else if (discardBatchRecords)
+BatchRetention.DELETE
+  else
+BatchRetention.DELETE_EMPTY
+new RecordFilter.BatchRetentionResult(batchRetention, canDiscardBatch)

Review comment:
   hmm yeah I think you are right. I'll change to `canDiscardBatch && 
batch.isControlBatch`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gitlw commented on pull request #11285: KAFKA-10548: Implement topic deletion logic with the LeaderAndIsr in KIP-516

2021-09-15 Thread GitBox


gitlw commented on pull request #11285:
URL: https://github.com/apache/kafka/pull/11285#issuecomment-920312333


   @lbradstreet @jolshan Can you please take a look? Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-15 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r709503140



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -622,26 +628,35 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding 
topic
* @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
*/
   private[log] def cleanInto(topicPartition: TopicPartition,
  sourceRecords: FileRecords,
  dest: LogSegment,
  map: OffsetMap,
- retainDeletesAndTxnMarkers: Boolean,
+ retainLegacyDeletesAndTxnMarkers: Boolean,
+ deleteRetentionMs: Long,
  maxLogMessageSize: Int,
  transactionMetadata: CleanedTransactionMetadata,
  lastRecordsOfActiveProducers: Map[Long, 
LastRecord],
- stats: CleanerStats): Unit = {
-val logCleanerFilter: RecordFilter = new RecordFilter {
+ stats: CleanerStats,
+ currentTime: Long): Unit = {
+val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, 
deleteRetentionMs) {
   var discardBatchRecords: Boolean = _
 
-  override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+  override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult = {
 // we piggy-back on the tombstone retention logic to delay deletion of 
transaction markers.
 // note that we will never delete a marker until all the records from 
that transaction are removed.
-discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, 
retainTxnMarkers = retainDeletesAndTxnMarkers)
+val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+if (batch.isControlBatch)
+  discardBatchRecords = canDiscardBatch && 
batch.deleteHorizonMs().isPresent && batch.deleteHorizonMs().getAsLong <= 
currentTime
+else
+  discardBatchRecords = canDiscardBatch

Review comment:
   makes sense. I've removed that comment on 1136 since the case is 
mentioned in `isBatchLastRecordOfProducer`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415690#comment-17415690
 ] 

Matthias J. Sax edited comment on KAFKA-13261 at 9/15/21, 6:56 PM:
---

I would prefer to do a KIP and allow user to pass in a custom partitioner.

If we really get follow up requests, we could extend the DSL logic to 
auto-forward an upstream partitioner later. Overall, it seems not that there is 
no bug in the strong sense, but a missing feature. We never designed FK-join to 
support custom partitioning.

Might be worth to update the docs for 3.0 and earlier to point out this 
limitation.


was (Author: mjsax):
I would prefer to do a KIP and allow user to pass in a custom partitioner.

If we really get follow up requests, we could extend the DSL logic to 
auto-forward an upstream partitioner later.

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vincent81jiang opened a new pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-15 Thread GitBox


vincent81jiang opened a new pull request #11327:
URL: https://github.com/apache/kafka/pull/11327


   * Fix KAFKA-13305: NullPointerException in LogCleanerManager 
"uncleanable-bytes" gauge
   - Add a periodic task to remove deleted partitions from uncleanablePartitions
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415690#comment-17415690
 ] 

Matthias J. Sax commented on KAFKA-13261:
-

I would prefer to do a KIP and allow user to pass in a custom partitioner.

If we really get follow up requests, we could extend the DSL logic to 
auto-forward an upstream partitioner later.

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ccding commented on pull request #11293: MINOR: defineInternal for KIP-405 configs

2021-09-15 Thread GitBox


ccding commented on pull request #11293:
URL: https://github.com/apache/kafka/pull/11293#issuecomment-920248580


   Will cherry-pick this PR to 3.0 after it is merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding edited a comment on pull request #11293: MINOR: defineInternal for KIP-405 configs

2021-09-15 Thread GitBox


ccding edited a comment on pull request #11293:
URL: https://github.com/apache/kafka/pull/11293#issuecomment-920247203


   The comment of `defineInternal` says
   ```
* Define a new internal configuration. Internal configuration won't 
show up in the docs and aren't
* intended for general use.
   ```
   I read it as the config should be available and visible to users, but not 
documented. Therefore, I changed the DescribeTopic call to return internal 
configs.
   
   PTAL @junrao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11293: MINOR: defineInternal for KIP-405 configs

2021-09-15 Thread GitBox


ccding commented on pull request #11293:
URL: https://github.com/apache/kafka/pull/11293#issuecomment-920247203


   The commend of `defineInternal` says
   ```
* Define a new internal configuration. Internal configuration won't 
show up in the docs and aren't
* intended for general use.
   ```
   I read it as the config should be available and visible to users, but not 
documented. Therefore, I changed the DescribeTopic call to return internal 
configs.
   
   PTAL @junrao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13305) NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

2021-09-15 Thread Vincent Jiang (Jira)
Vincent Jiang created KAFKA-13305:
-

 Summary: NullPointerException in LogCleanerManager 
"uncleanable-bytes" gauge
 Key: KAFKA-13305
 URL: https://issues.apache.org/jira/browse/KAFKA-13305
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Reporter: Vincent Jiang


We've seen following exception in production environment:
{quote} java.lang.NullPointerException: Cannot invoke 
"kafka.log.UnifiedLog.logStartOffset()" because "log" is null at

kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:599)
{quote}
Looks like uncleanablePartitions never has partitions removed from it to 
reflect partition deletion/reassignment.

 

We should fix the NullPointerException and removed deleted partitions from 
uncleanablePartitions.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7408) Truncate to LSO on unclean leader election

2021-09-15 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415635#comment-17415635
 ] 

Jose Armando Garcia Sancio commented on KAFKA-7408:
---

Good points [~guozhang] .
{quote}later the txn coordinator tries to write a `zC` on behalf of the 
producer, do we guaranteed this `zC` would be rejected?
{quote}
I don't know the answer at the moment and need to look at the code in detail to 
confirm but you are correct, the transaction coordinator needs to reject a 
{{zC}} if the transaction was already aborted by the unclean leader.

> Truncate to LSO on unclean leader election
> --
>
> Key: KAFKA-7408
> URL: https://issues.apache.org/jira/browse/KAFKA-7408
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> If an unclean leader is elected, we may lose committed transaction data. That 
> alone is expected, but what is worse is that a transaction which was 
> previously completed (either committed or aborted) may lose its marker and 
> become dangling. The transaction coordinator will not know about the unclean 
> leader election, so will not know to resend the transaction markers. 
> Consumers with read_committed isolation will be stuck because the LSO cannot 
> advance.
> To keep this scenario from occurring, it would be better to have the unclean 
> leader truncate to the LSO so that there are no dangling transactions. 
> Truncating to the LSO is not alone sufficient because the markers which 
> allowed the LSO advancement may be at higher offsets. What we can do is let 
> the newly elected leader truncate to the LSO and then rewrite all the markers 
> that followed it using its own leader epoch (to avoid divergence from 
> followers).
> The interesting cases when an unclean leader election occurs are are when a 
> transaction is ongoing. 
> 1. If a producer is in the middle of a transaction commit, then the 
> coordinator may still attempt to write transaction markers. This will either 
> succeed or fail depending on the producer epoch in the unclean leader. If the 
> epoch matches, then the WriteTxnMarker call will succeed, which will simply 
> be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker 
> call will fail and the transaction coordinator can potentially remove the 
> partition from the transaction.
> 2. If a producer is still writing the transaction, then what happens depends 
> on the producer state in the unclean leader. If no producer state has been 
> lost, then the transaction can continue without impact. Otherwise, the 
> producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will 
> cause the transaction to be aborted by the coordinator. That takes us back to 
> the first case.
> By truncating the LSO, we ensure that transactions are either preserved in 
> whole or they are removed from the log in whole. For an unclean leader 
> election, that's probably as good as we can do. But we are ensured that 
> consumers will not be blocked by dangling transactions. The only remaining 
> situation where a dangling transaction might be left is if one of the 
> transaction state partitions has an unclean leader election.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7408) Truncate to LSO on unclean leader election

2021-09-15 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-7408:
-

Assignee: Jose Armando Garcia Sancio

> Truncate to LSO on unclean leader election
> --
>
> Key: KAFKA-7408
> URL: https://issues.apache.org/jira/browse/KAFKA-7408
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> If an unclean leader is elected, we may lose committed transaction data. That 
> alone is expected, but what is worse is that a transaction which was 
> previously completed (either committed or aborted) may lose its marker and 
> become dangling. The transaction coordinator will not know about the unclean 
> leader election, so will not know to resend the transaction markers. 
> Consumers with read_committed isolation will be stuck because the LSO cannot 
> advance.
> To keep this scenario from occurring, it would be better to have the unclean 
> leader truncate to the LSO so that there are no dangling transactions. 
> Truncating to the LSO is not alone sufficient because the markers which 
> allowed the LSO advancement may be at higher offsets. What we can do is let 
> the newly elected leader truncate to the LSO and then rewrite all the markers 
> that followed it using its own leader epoch (to avoid divergence from 
> followers).
> The interesting cases when an unclean leader election occurs are are when a 
> transaction is ongoing. 
> 1. If a producer is in the middle of a transaction commit, then the 
> coordinator may still attempt to write transaction markers. This will either 
> succeed or fail depending on the producer epoch in the unclean leader. If the 
> epoch matches, then the WriteTxnMarker call will succeed, which will simply 
> be ignored by the consumer. If the epoch doesn't match, the WriteTxnMarker 
> call will fail and the transaction coordinator can potentially remove the 
> partition from the transaction.
> 2. If a producer is still writing the transaction, then what happens depends 
> on the producer state in the unclean leader. If no producer state has been 
> lost, then the transaction can continue without impact. Otherwise, the 
> producer will likely fail with an OUT_OF_ORDER_SEQUENCE error, which will 
> cause the transaction to be aborted by the coordinator. That takes us back to 
> the first case.
> By truncating the LSO, we ensure that transactions are either preserved in 
> whole or they are removed from the log in whole. For an unclean leader 
> election, that's probably as good as we can do. But we are ensured that 
> consumers will not be blocked by dangling transactions. The only remaining 
> situation where a dangling transaction might be left is if one of the 
> transaction state partitions has an unclean leader election.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-15 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r709357795



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+val partitionsToUpdateFollower = mutable.Set.empty[Partition]
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+// Only change partition state when the leader is available
+partitionsToUpdateFollower += partition
+  } else {
+// The leader broker should always be present in the metadata 
cache.
+// If not, we should record the error message and abort the 
transition process for this partition
+stateChangeLogger.error(s"Received LeaderAndIsrRequest with 
correlation id $correlationId from " +
+  s"controller $controllerId epoch $controllerEpoch for partition 
${partition.topicPartition} " +
+  s"(last update controller epoch 
${partitionState.controllerEpoch}) " +
+  s"but cannot become follower since the new leader 
$newLeaderBrokerId is unavailable.")
+  }
+  }
+
+  if (isShuttingDown.get()) {
+if (traceLoggingEnabled) {
+  partitionsToUpdateFollower.foreach { partition =>
+stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+  s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+  s"partition ${partition.topicPartition} with leader 
${partitionStates(partition).leader} " +
+  "since it is shutting down")
+  }
+}
+  } else {
+val partitionsToUpdateFollowerWithLeader = 
partitionsToUpdateFollower.map { partition =>
+  val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+  val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), 
leaderNode.port())
+  (partition.topicPartition, BrokerAndFetcherId(leader, 
replicaFetcherManager.getFetcherId(partition.topicPartition)))
+}
+
replicaFetcherManager.addTopicIdsToFetcherThread(partitionsToUpdateFollowerWithLeader,
 topicIds)

Review comment:
   Do you think we also don't need the check for the leader in the metadata 
cache? ie:
   ```
   val newLeaderBrokerId = partitionState.leader
 if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
   // Only change partition state when the leader is available
   partitionsToUpdateFollower += partition
```

If we do keep it, I can change the comment to be less confusing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct

2021-09-15 Thread GitBox


cmccabe commented on pull request #11320:
URL: https://github.com/apache/kafka/pull/11320#issuecomment-920176422


   > On the builders, should we drop the "set" from the method names? I believe 
we have establish the pattern (in Scala, at least) of not including "get" and 
"set" in getters and setters names. Should we extend that pattern to Java code 
as well?
   
   Hmm... as far as I know, the pattern in Kafka is to omit the "get" in 
getters, so we have `x()` rather than `getX()`. I don't think omitting the 
"set" in `setX()` is a good idea since it creates confusion between what is a 
setter and what is a getter. I can give a lot of precedents. For example, all 
the generated code created by MessageDataGenerator has always used "setX" to 
refer to setters for X.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

2021-09-15 Thread GitBox


wcarlson5 commented on pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#issuecomment-920174685


   @guozhangwang That is a really good point. Maybe we should more be specific. 
 What if each topology could request a percentage of the total cache? If a 
request made the total exceed 99% the request would be rejected. Any unclaimed 
cache would be split among the topologies that did not claim any. A topology 
could lower their cache size if they want to make space for a new topology.
   
   1. If A requests 50% it gets it an the rest is unused
   2. B joins but does not request so it gets the other 50
   3. C joins but request 75% so it fails. C then requests 25% so now A has 
50%, B 25% and C 25%
   4. D joins without a request so now  A has 50%, B 12%, C 25% and D 13%
   5. A reduces its request to 25% now all have 25%
   6. E joins and requests 0%, not using any cache and all other topologies are 
unchanged
   
   I think that should work. now we have both an upper bound on total memory 
and a minimum guarantee  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct

2021-09-15 Thread GitBox


cmccabe commented on a change in pull request #11320:
URL: https://github.com/apache/kafka/pull/11320#discussion_r709349588



##
File path: core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.builders;
+
+import kafka.coordinator.group.GroupCoordinator;
+import kafka.coordinator.transaction.TransactionCoordinator;
+import kafka.network.RequestChannel;
+import kafka.server.ApiVersionManager;
+import kafka.server.AutoTopicCreationManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.DelegationTokenManager;
+import kafka.server.FetchManager;
+import kafka.server.KafkaApis;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.MetadataSupport;
+import kafka.server.QuotaFactory.QuotaManagers;
+import kafka.server.ReplicaManager;
+import kafka.server.metadata.ConfigRepository;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.util.Collections;
+import java.util.Optional;
+import scala.compat.java8.OptionConverters;
+
+
+public class KafkaApisBuilder {
+private RequestChannel requestChannel = null;
+private MetadataSupport metadataSupport = null;
+private ReplicaManager replicaManager = null;
+private GroupCoordinator groupCoordinator = null;
+private TransactionCoordinator txnCoordinator = null;
+private AutoTopicCreationManager autoTopicCreationManager = null;
+private int brokerId = 0;
+private KafkaConfig config = null;
+private ConfigRepository configRepository = null;
+private MetadataCache metadataCache = null;
+private Metrics metrics = null;
+private Optional authorizer = Optional.empty();
+private QuotaManagers quotas = null;
+private FetchManager fetchManager = null;
+private BrokerTopicStats brokerTopicStats = null;
+private String clusterId = "clusterId";
+private Time time = Time.SYSTEM;
+private DelegationTokenManager tokenManager = null;
+private ApiVersionManager apiVersionManager = null;
+
+public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
+this.requestChannel = requestChannel;
+return this;
+}
+
+public KafkaApisBuilder setMetadataSupport(MetadataSupport 
metadataSupport) {
+this.metadataSupport = metadataSupport;
+return this;
+}
+
+public KafkaApisBuilder setReplicaManager(ReplicaManager replicaManager) {
+this.replicaManager = replicaManager;
+return this;
+}
+
+public KafkaApisBuilder setGroupCoordinator(GroupCoordinator 
groupCoordinator) {
+this.groupCoordinator = groupCoordinator;
+return this;
+}
+
+public KafkaApisBuilder setTxnCoordinator(TransactionCoordinator 
txnCoordinator) {
+this.txnCoordinator = txnCoordinator;
+return this;
+}
+
+public KafkaApisBuilder 
setAutoTopicCreationManager(AutoTopicCreationManager autoTopicCreationManager) {
+this.autoTopicCreationManager = autoTopicCreationManager;
+return this;
+}
+
+public KafkaApisBuilder setBrokerId(int brokerId) {
+this.brokerId = brokerId;
+return this;
+}
+
+public KafkaApisBuilder setConfig(KafkaConfig config) {
+this.config = config;
+return this;
+}
+
+public KafkaApisBuilder setConfigRepository(ConfigRepository 
configRepository) {
+this.configRepository = configRepository;
+return this;
+}
+
+public KafkaApisBuilder setMetadataCache(MetadataCache metadataCache) {
+this.metadataCache = metadataCache;
+return this;
+}
+
+public KafkaApisBuilder setMetrics(Metrics metrics) {
+this.metrics = metrics;
+return this;
+}
+
+public KafkaApisBuilder setAuthorizer(Optional authorizer) {
+this.authorizer = authorizer;
+return this;
+}
+
+public KafkaApisBuilder setQuotas(QuotaManagers quotas) {
+this.quotas = quotas;
+return this;
+}
+
+public KafkaApisBuilder setFetchManager(FetchManager 

[GitHub] [kafka] cmccabe commented on a change in pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct

2021-09-15 Thread GitBox


cmccabe commented on a change in pull request #11320:
URL: https://github.com/apache/kafka/pull/11320#discussion_r709348909



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -186,49 +186,37 @@ object ReplicaManager {
 class ReplicaManager(val config: KafkaConfig,
  metrics: Metrics,
  time: Time,
- val zkClient: Option[KafkaZkClient],
  scheduler: Scheduler,
  val logManager: LogManager,
- val isShuttingDown: AtomicBoolean,
  quotaManagers: QuotaManagers,
- val brokerTopicStats: BrokerTopicStats,
  val metadataCache: MetadataCache,
  logDirFailureChannel: LogDirFailureChannel,
- val delayedProducePurgatory: 
DelayedOperationPurgatory[DelayedProduce],
- val delayedFetchPurgatory: 
DelayedOperationPurgatory[DelayedFetch],
- val delayedDeleteRecordsPurgatory: 
DelayedOperationPurgatory[DelayedDeleteRecords],
- val delayedElectLeaderPurgatory: 
DelayedOperationPurgatory[DelayedElectLeader],
- threadNamePrefix: Option[String],
- val alterIsrManager: AlterIsrManager) extends Logging 
with KafkaMetricsGroup {
-
-  def this(config: KafkaConfig,
-   metrics: Metrics,
-   time: Time,
-   zkClient: Option[KafkaZkClient],
-   scheduler: Scheduler,
-   logManager: LogManager,
-   isShuttingDown: AtomicBoolean,
-   quotaManagers: QuotaManagers,
-   brokerTopicStats: BrokerTopicStats,
-   metadataCache: MetadataCache,
-   logDirFailureChannel: LogDirFailureChannel,
-   alterIsrManager: AlterIsrManager,
-   threadNamePrefix: Option[String] = None) = {
-this(config, metrics, time, zkClient, scheduler, logManager, 
isShuttingDown,
-  quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel,
-  DelayedOperationPurgatory[DelayedProduce](
-purgatoryName = "Produce", brokerId = config.brokerId,
-purgeInterval = config.producerPurgatoryPurgeIntervalRequests),
-  DelayedOperationPurgatory[DelayedFetch](
-purgatoryName = "Fetch", brokerId = config.brokerId,
-purgeInterval = config.fetchPurgatoryPurgeIntervalRequests),
-  DelayedOperationPurgatory[DelayedDeleteRecords](
-purgatoryName = "DeleteRecords", brokerId = config.brokerId,
-purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests),
-  DelayedOperationPurgatory[DelayedElectLeader](
-purgatoryName = "ElectLeader", brokerId = config.brokerId),
-  threadNamePrefix, alterIsrManager)
-  }
+ val alterIsrManager: AlterIsrManager,
+ val brokerTopicStats: BrokerTopicStats = new 
BrokerTopicStats(),
+ val isShuttingDown: AtomicBoolean = new 
AtomicBoolean(false),
+ val zkClient: Option[KafkaZkClient] = None,
+ delayedProducePurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedProduce]] = None,
+ delayedFetchPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedFetch]] = None,
+ delayedDeleteRecordsPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
+ delayedElectLeaderPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
+ threadNamePrefix: Option[String] = None,

Review comment:
   It seems good to use a trailing comma here, since otherwise adding a new 
parameter changes the previous line (adding unnecessary conflicts)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct

2021-09-15 Thread GitBox


cmccabe commented on a change in pull request #11320:
URL: https://github.com/apache/kafka/pull/11320#discussion_r709348152



##
File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
##
@@ -75,13 +75,25 @@ class LogLoaderTest {
 
 // Create a LogManager with some overridden methods to facilitate 
interception of clean shutdown
 // flag and to inject a runtime error
-def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], 
simulateError: SimulateError): LogManager = {
-  new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), 
initialOfflineDirs = Array.empty[File], new MockConfigRepository(),
-initialDefaultConfig = logConfig, cleanerConfig = 
CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4,
-flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 1L, 
flushStartOffsetCheckpointMs = 1L,
-retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, 
scheduler = time.scheduler, time = time,
-brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size),
-keepPartitionMetadataFile = config.usesTopicId, 
interBrokerProtocolVersion = config.interBrokerProtocolVersion) {
+def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], 
simulateError: SimulateError): LogManager =
+  new LogManager(
+logDirs = logDirs.map(_.getAbsoluteFile),
+initialOfflineDirs = Array.empty[File],
+configRepository = new MockConfigRepository(),
+initialDefaultConfig = logConfig,
+cleanerConfig = CleanerConfig(enableCleaner = false),
+recoveryThreadsPerDataDir = 4,
+flushCheckMs = 1000L,
+flushRecoveryOffsetCheckpointMs = 1L,
+flushStartOffsetCheckpointMs = 1L,
+retentionCheckMs = 1000L,
+maxPidExpirationMs = 60 * 60 * 1000,
+interBrokerProtocolVersion = config.interBrokerProtocolVersion,
+scheduler = time.scheduler,
+brokerTopicStats = new BrokerTopicStats,

Review comment:
   I'll add parens




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-15 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r709342716



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -741,7 +741,8 @@ class ReplicaManager(val config: KafkaConfig,
 
   // throw NotLeaderOrFollowerException if replica does not exist for 
the given partition
   val partition = getPartitionOrException(topicPartition)
-  partition.localLogOrException

Review comment:
   The code surrounding is from 14 months - 2 years ago, so not sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-15 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r709337669



##
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##
@@ -163,6 +163,17 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for 
partitions $initialOffsetAndEpochs")
   }
 
+  def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, 
BrokerAndFetcherId)], topicIds: String => Option[Uuid]): Unit = {
+lock synchronized {
+  val partitionsPerFetcher = partitionsToUpdate.groupMap(_._2)(_._1)
+
+  for ((brokerAndFetcherId, partitions) <- partitionsPerFetcher) {
+val brokerIdAndFetcherId = 
BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)

Review comment:
   Ah actually this is really subtle. One is BrokerAndFetcherId and the 
other is Broker**_Id_**AndFetcherId. I can simplify and just create the second 
one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-15 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r709334903



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -459,18 +459,22 @@ abstract class AbstractFetcherThread(name: String,
*/
   private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
 if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
-  currentState
+  if (currentState.topicId.isEmpty && initialFetchState.topicId.isDefined) 
{
+currentState.updateTopicId(initialFetchState.topicId)
+  } else {
+currentState
+  }

Review comment:
   Ah good catch. I don't think so.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-15 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r709328285



##
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##
@@ -163,6 +163,17 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 info(s"Added fetcher to broker ${fetcherThread.sourceBroker.id} for 
partitions $initialOffsetAndEpochs")
   }
 
+  def addTopicIdsToFetcherThread(partitionsToUpdate: Set[(TopicPartition, 
BrokerAndFetcherId)], topicIds: String => Option[Uuid]): Unit = {
+lock synchronized {
+  val partitionsPerFetcher = partitionsToUpdate.groupMap(_._2)(_._1)
+
+  for ((brokerAndFetcherId, partitions) <- partitionsPerFetcher) {
+val brokerIdAndFetcherId = 
BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)

Review comment:
   I was pulling from the add partitions method. But it does something 
different. 臘‍♀️
   I can just use the one from before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11261: KAFKA-13228: Ensure ApiVersionRequest is properly handled in KRaft

2021-09-15 Thread GitBox


dengziming commented on pull request #11261:
URL: https://github.com/apache/kafka/pull/11261#issuecomment-920145832


   ping @hachikuji @abbccdda 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-09-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13162.
-
Resolution: Fixed

> ElectLeader API must be forwarded to Controller
> ---
>
> Key: KAFKA-13162
> URL: https://issues.apache.org/jira/browse/KAFKA-13162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.1.0
>
>
> We're missing the logic to forward ElectLeaders requests to the controller. 
> This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-09-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13162:

Fix Version/s: (was: 3.0.1)
   3.1.0

> ElectLeader API must be forwarded to Controller
> ---
>
> Key: KAFKA-13162
> URL: https://issues.apache.org/jira/browse/KAFKA-13162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.1.0
>
>
> We're missing the logic to forward ElectLeaders requests to the controller. 
> This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-09-15 Thread GitBox


hachikuji merged pull request #11186:
URL: https://github.com/apache/kafka/pull/11186


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11320: MINOR: Make ReplicaManager, LogManager, KafkaApis easier to construct

2021-09-15 Thread GitBox


mumrah commented on a change in pull request #11320:
URL: https://github.com/apache/kafka/pull/11320#discussion_r709241900



##
File path: core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
##
@@ -75,13 +75,25 @@ class LogLoaderTest {
 
 // Create a LogManager with some overridden methods to facilitate 
interception of clean shutdown
 // flag and to inject a runtime error
-def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], 
simulateError: SimulateError): LogManager = {
-  new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), 
initialOfflineDirs = Array.empty[File], new MockConfigRepository(),
-initialDefaultConfig = logConfig, cleanerConfig = 
CleanerConfig(enableCleaner = false), recoveryThreadsPerDataDir = 4,
-flushCheckMs = 1000L, flushRecoveryOffsetCheckpointMs = 1L, 
flushStartOffsetCheckpointMs = 1L,
-retentionCheckMs = 1000L, maxPidExpirationMs = 60 * 60 * 1000, 
scheduler = time.scheduler, time = time,
-brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size),
-keepPartitionMetadataFile = config.usesTopicId, 
interBrokerProtocolVersion = config.interBrokerProtocolVersion) {
+def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], 
simulateError: SimulateError): LogManager =
+  new LogManager(
+logDirs = logDirs.map(_.getAbsoluteFile),
+initialOfflineDirs = Array.empty[File],
+configRepository = new MockConfigRepository(),
+initialDefaultConfig = logConfig,
+cleanerConfig = CleanerConfig(enableCleaner = false),
+recoveryThreadsPerDataDir = 4,
+flushCheckMs = 1000L,
+flushRecoveryOffsetCheckpointMs = 1L,
+flushStartOffsetCheckpointMs = 1L,
+retentionCheckMs = 1000L,
+maxPidExpirationMs = 60 * 60 * 1000,
+interBrokerProtocolVersion = config.interBrokerProtocolVersion,
+scheduler = time.scheduler,
+brokerTopicStats = new BrokerTopicStats,

Review comment:
   nit: we should make the no-arg constructor calls consistent wrt parens

##
File path: core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.builders;
+
+import kafka.coordinator.group.GroupCoordinator;
+import kafka.coordinator.transaction.TransactionCoordinator;
+import kafka.network.RequestChannel;
+import kafka.server.ApiVersionManager;
+import kafka.server.AutoTopicCreationManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.DelegationTokenManager;
+import kafka.server.FetchManager;
+import kafka.server.KafkaApis;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.MetadataSupport;
+import kafka.server.QuotaFactory.QuotaManagers;
+import kafka.server.ReplicaManager;
+import kafka.server.metadata.ConfigRepository;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.authorizer.Authorizer;
+
+import java.util.Collections;
+import java.util.Optional;
+import scala.compat.java8.OptionConverters;
+
+
+public class KafkaApisBuilder {
+private RequestChannel requestChannel = null;
+private MetadataSupport metadataSupport = null;
+private ReplicaManager replicaManager = null;
+private GroupCoordinator groupCoordinator = null;
+private TransactionCoordinator txnCoordinator = null;
+private AutoTopicCreationManager autoTopicCreationManager = null;
+private int brokerId = 0;
+private KafkaConfig config = null;
+private ConfigRepository configRepository = null;
+private MetadataCache metadataCache = null;
+private Metrics metrics = null;
+private Optional authorizer = Optional.empty();
+private QuotaManagers quotas = null;
+private FetchManager fetchManager = null;
+private BrokerTopicStats brokerTopicStats = null;
+private String clusterId = "clusterId";
+private Time time = Time.SYSTEM;
+private DelegationTokenManager tokenManager = null;
+private ApiVersionManager apiVersionManager = null;
+
+ 

[jira] [Created] (KAFKA-13304) Implicit cast of source type long to narrower destination type int in org.apache.kafka.common.network.MultiSend.java

2021-09-15 Thread Thomas Bachmann (Jira)
Thomas Bachmann created KAFKA-13304:
---

 Summary: Implicit cast of source type long to narrower destination 
type int in org.apache.kafka.common.network.MultiSend.java
 Key: KAFKA-13304
 URL: https://issues.apache.org/jira/browse/KAFKA-13304
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 2.8.0
Reporter: Thomas Bachmann


During a security review of Kafka I came across this bug 
[https://lgtm.com/projects/g/apache/kafka/rev/78ba492e3e70fd9db61bc82469371d04a8d6b762/files/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java?sort=name=ASC=heatmap#x9c1b5901406741c6:1]
{code:java}
@Override74public long writeTo(GatheringByteChannel channel) throws 
IOException {75if (completed())76throw new 
KafkaException("This operation cannot be completed on a complete 
request.");7778int totalWrittenPerCall = 0;79boolean 
sendComplete = false;80do {81long written = 
current.writeTo(channel);82totalWritten += written;83
totalWrittenPerCall += written;
{code}
"Implicit cast of source type long to narrower destination type int."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480

2021-09-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415497#comment-17415497
 ] 

Luke Chen commented on KAFKA-13303:
---

Could you also leave your PR link on KAFKA-9965 comment, so that they can be 
closed together. Thanks.

> RoundRobinPartitioner broken by KIP-480
> ---
>
> Key: KAFKA-13303
> URL: https://issues.apache.org/jira/browse/KAFKA-13303
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jon McEwen
>Priority: Minor
>
> Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave 
> correctly.  An additional call to `partition()` on new batch leads to 
> partitions being skipped.
>  
> I have a fix that I would like to contribute, but I need help getting started 
> as a contributor, e.g. for basic things like formatting the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480

2021-09-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415496#comment-17415496
 ] 

Luke Chen commented on KAFKA-13303:
---

This is duplicated with KAFKA-9965.

> RoundRobinPartitioner broken by KIP-480
> ---
>
> Key: KAFKA-13303
> URL: https://issues.apache.org/jira/browse/KAFKA-13303
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jon McEwen
>Priority: Minor
>
> Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave 
> correctly.  An additional call to `partition()` on new batch leads to 
> partitions being skipped.
>  
> I have a fix that I would like to contribute, but I need help getting started 
> as a contributor, e.g. for basic things like formatting the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-15 Thread NEERAJ VAIDYA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415491#comment-17415491
 ] 

NEERAJ VAIDYA commented on KAFKA-13292:
---

Thanks [~mjsax]

I will look at upgrading the client libraries to 2.8.0.

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13292
> URL: https://issues.apache.org/jira/browse/KAFKA-13292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: NEERAJ VAIDYA
>Priority: Major
>
> I have a KafkaStreams application which consumes from a topic which has 12 
> partitions. The incoming message rate into this topic is very low, perhaps 
> 3-4 per minute. Also, some partitions will not receive messages for more than 
> 7 days.
>  
> Exactly after 7 days of starting this application, I seem to be getting the 
> following exception and the application shuts down, without processing 
> anymore messages :
>  
> {code:java}
> 2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer 
> clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
>  transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> 2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
> Error encountered sending record to topic 
> mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
> following exception during processing and the thread is going to shut down:
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
>  
> After this, I can see that all 12 tasks (because there are 12 partitions for 
> all topics) get shutdown and this brings down the whole application.
>  
> I understand that the transactional.id.expiration.ms = 7 days (default) will 
> likely 

[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-09-15 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r709122840



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => 
Option[Uuid]) = {

Review comment:
   nit: Should we prefix this method with `maybe` to indicate that it would 
set the topic id only if there is a state for the topic partition?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -491,6 +495,19 @@ abstract class AbstractFetcherThread(name: String,
 } finally partitionMapLock.unlock()
   }
 
+  def addTopicIdsToThread(partitions: Set[TopicPartition], topicIds: String => 
Option[Uuid]) = {
+partitionMapLock.lockInterruptibly()
+try {
+  partitions.foreach { tp =>
+val currentState = partitionStates.stateValue(tp)

Review comment:
   Should we ensure that there is actually a state? It must be there but it 
might be better to be safe.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -459,18 +459,22 @@ abstract class AbstractFetcherThread(name: String,
*/
   private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
 if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
-  currentState
+  if (currentState.topicId.isEmpty && initialFetchState.topicId.isDefined) 
{
+currentState.updateTopicId(initialFetchState.topicId)
+  } else {
+currentState
+  }

Review comment:
   Is this change still necessary? 

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1762,66 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+
+val partitionsToUpdateFollower = mutable.Set.empty[Partition]
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {
+// Only change partition state when the leader is available
+partitionsToUpdateFollower += partition
+  } else {
+// The leader broker should always be present in the metadata 
cache.
+// If not, we should record the error message and abort the 
transition process for this partition
+stateChangeLogger.error(s"Received LeaderAndIsrRequest with 
correlation id $correlationId from " +
+  s"controller $controllerId epoch $controllerEpoch for partition 
${partition.topicPartition} " +
+  s"(last update controller epoch 
${partitionState.controllerEpoch}) " +
+  s"but cannot become follower since the new leader 
$newLeaderBrokerId is unavailable.")
+  }
+  }
+
+  if (isShuttingDown.get()) {
+if (traceLoggingEnabled) {
+  partitionsToUpdateFollower.foreach { partition =>
+stateChangeLogger.trace(s"Skipped the update topic ID step of the 
become-follower state " +
+  s"change with correlation id $correlationId from controller 
$controllerId epoch $controllerEpoch for " +
+  s"partition ${partition.topicPartition} with leader 
${partitionStates(partition).leader} " +
+  "since it is shutting down")
+  }
+}
+  } else {
+val partitionsToUpdateFollowerWithLeader = 
partitionsToUpdateFollower.map { partition =>
+  val leaderNode = partition.leaderReplicaIdOpt.flatMap(leaderId => 
metadataCache.
+getAliveBrokerNode(leaderId, 
config.interBrokerListenerName)).getOrElse(Node.noNode())
+  val leader = new BrokerEndPoint(leaderNode.id(), leaderNode.host(), 
leaderNode.port())
+  (partition.topicPartition, BrokerAndFetcherId(leader, 
replicaFetcherManager.getFetcherId(partition.topicPartition)))

Review comment:
   It looks like that `addTopicIdsToFetcherThread` only needs the `leader`, 
the `topic-partition` (to compute the fetcher id, and the `topic id`. How about 
passing just those? I would also let the fetcher manager compute the fetcher id.

##
File path: 

[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2021-09-15 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415486#comment-17415486
 ] 

Sagar Rao commented on KAFKA-13295:
---

[~guozhang]/ [~ableegoldman] I assigned this to myself.. Would go through the 
code and see if I can find something :D 

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos
> Fix For: 3.1.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2021-09-15 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13295:
-

Assignee: Sagar Rao

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos
> Fix For: 3.1.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480

2021-09-15 Thread Jon McEwen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415452#comment-17415452
 ] 

Jon McEwen commented on KAFKA-13303:


WIP patch: https://github.com/apache/kafka/pull/11326

> RoundRobinPartitioner broken by KIP-480
> ---
>
> Key: KAFKA-13303
> URL: https://issues.apache.org/jira/browse/KAFKA-13303
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jon McEwen
>Priority: Minor
>
> Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave 
> correctly.  An additional call to `partition()` on new batch leads to 
> partitions being skipped.
>  
> I have a fix that I would like to contribute, but I need help getting started 
> as a contributor, e.g. for basic things like formatting the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jonmcewen opened a new pull request #11326: WIP: KAFKA-13303: RoundRobinPartitioner broken by KIP-480

2021-09-15 Thread GitBox


jonmcewen opened a new pull request #11326:
URL: https://github.com/apache/kafka/pull/11326


   NEEDS FORMAT AND TESTS fixes for sticky partitioning and thread safety.  
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13246) StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread does not gate on stream state well

2021-09-15 Thread Andre Dymel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andre Dymel reassigned KAFKA-13246:
---

Assignee: Andre Dymel

> StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread 
> does not gate on stream state well
> 
>
> Key: KAFKA-13246
> URL: https://issues.apache.org/jira/browse/KAFKA-13246
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Andre Dymel
>Priority: Major
>  Labels: newbie
>
> StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread 
> should be improved by waiting for the client to go to rebalancing or running 
> after adding and removing a thread. It should also wait until running before 
> querying the state store 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480

2021-09-15 Thread Jon McEwen (Jira)
Jon McEwen created KAFKA-13303:
--

 Summary: RoundRobinPartitioner broken by KIP-480
 Key: KAFKA-13303
 URL: https://issues.apache.org/jira/browse/KAFKA-13303
 Project: Kafka
  Issue Type: Bug
Reporter: Jon McEwen


Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave 
correctly.  An additional call to `partition()` on new batch leads to 
partitions being skipped.

 

I have a fix that I would like to contribute, but I need help getting started 
as a contributor, e.g. for basic things like formatting the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-15 Thread yangshengwei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415358#comment-17415358
 ] 

yangshengwei edited comment on KAFKA-13301 at 9/15/21, 9:59 AM:


 

[https://kafka.apache.org/documentation/#upgrade_200_notable]

 

!image-2021-09-15-15-37-25-561.png|width=521,height=180!

  !image-2021-09-15-15-39-00-179.png|width=518,height=564!


was (Author: yangshengwei):
https://kafka.apache.org/documentation/#upgrade_200_notable

 

!image-2021-09-15-15-37-25-561.png|width=521,height=180!

  !image-2021-09-15-15-39-00-179.png|width=518,height=564!

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Attachments: image-2021-09-15-15-37-25-561.png, 
> image-2021-09-15-15-39-00-179.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-15 Thread yangshengwei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415358#comment-17415358
 ] 

yangshengwei edited comment on KAFKA-13301 at 9/15/21, 9:58 AM:


https://kafka.apache.org/documentation/#upgrade_200_notable

 

!image-2021-09-15-15-37-25-561.png|width=521,height=180!

  !image-2021-09-15-15-39-00-179.png|width=518,height=564!


was (Author: yangshengwei):
!image-2021-09-15-15-37-25-561.png|width=521,height=180!

  !image-2021-09-15-15-39-00-179.png|width=518,height=564!

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Attachments: image-2021-09-15-15-37-25-561.png, 
> image-2021-09-15-15-39-00-179.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13302) [IEP-59] Support not default page size

2021-09-15 Thread Nikolay Izhikov (Jira)
Nikolay Izhikov created KAFKA-13302:
---

 Summary: [IEP-59] Support not default page size
 Key: KAFKA-13302
 URL: https://issues.apache.org/jira/browse/KAFKA-13302
 Project: Kafka
  Issue Type: Improvement
Reporter: Nikolay Izhikov


Currently, CDC doesn't support not default page size.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13300) Kafka ACL Restriction Group Is not being applied

2021-09-15 Thread Manikumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415352#comment-17415352
 ] 

Manikumar edited comment on KAFKA-13300 at 9/15/21, 7:43 AM:
-

kafka-acls.sh command {{"-add"}} option is for adding an acl and {{"-remove"}} 
is to remove an existing acl. Consuming from a group without read permission 
should fail unless we configure {{"allow.everyone.if.no.acl.found=true"}}
 [https://kafka.apache.org/documentation/#security_authz]

I am not able to reproduce the issue. Can you attach the \{{ server.properties 
file}}, authorizer debug logs and steps to reproduce the issue.


was (Author: omkreddy):
kafka-acls.sh command {{"-add"}} option is for adding an acl and {{"-remove"}} 
is to remove an existing acl. Consuming from a group without read permission 
should fail unless we configure {{"allow.everyone.if.no.acl.found=true"}}
 [https://kafka.apache.org/documentation/#security_authz]

I am not able to reproduce the issue. Can you attach the \{{ server.properties 
file}} and steps to reproduce the issue.

> Kafka ACL Restriction Group Is not being applied
> 
>
> Key: KAFKA-13300
> URL: https://issues.apache.org/jira/browse/KAFKA-13300
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.2
>Reporter: Adriano Jesus
>Priority: Minor
>
> Hi,
> I am creating a KAFKA ACL with a fake group restriction as above:
>  
> {code:java}
> ./kafka-acls.sh \                                                             
>                                                     
>     --authorizer-properties zookeeper.connect=$ZOOKEEPER \
>     --remove --allow-principal User:'Kafka-tools' \
>     --consumer  --group fake-group \
>     --topic delete-me-2
> {code}
>  
> When I try to consume a message with the same user, 'Kafka-tools', and with 
> another group I am still able to consume the messages:
> {code:java}
> // ./kafka-console-consumer.sh --bootstrap-server=$KAFKA --topic delete-me-2 
> --consumer.config user-auth.properties --from-beginning --group teste
> {code}
> According to documentation this property can be used as consumer group 
> ([https://docs.confluent.io/platform/current/kafka/authorization.html):]
> "*Group*
> Groups in the brokers. All protocol calls that work with groups, such as 
> joining a group, must have corresponding privileges with the group in the 
> subject. Group ({{group.id}}) can mean Consumer Group, Stream Group 
> ({{application.id}}), Connect Worker Group, or any other group that uses the 
> Consumer Group protocol, like Schema Registry cluster."
> I did another test adding a consumer act permission with this command:
> {code:java}
> ./kafka-acls.sh \                                                             
>                                                     
>     --authorizer-properties zookeeper.connect=$ZOOKEEPER \
>     --add --allow-principal User:'Kafka-tools' \
>     --consumer  --group fake-group \
>     --topic delete-me-2
> {code}
> After that I removed the ACL authorization to READ operation for Group 
> resource. I tried again to consume from this topic. And still being able to 
> consume message from this topic even though without READ group permission.
> Maybe my interpretation is wrong. But it seens that Kafka ACL is validating 
> the group permissions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-15 Thread yangshengwei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415361#comment-17415361
 ] 

yangshengwei commented on KAFKA-13301:
--

[~guozhang]

 
h3. [guozhangwang|https://github.com/guozhangwang]

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Attachments: image-2021-09-15-15-37-25-561.png, 
> image-2021-09-15-15-39-00-179.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-15 Thread yangshengwei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415358#comment-17415358
 ] 

yangshengwei edited comment on KAFKA-13301 at 9/15/21, 7:39 AM:


!image-2021-09-15-15-37-25-561.png|width=521,height=180!

  !image-2021-09-15-15-39-00-179.png|width=518,height=564!


was (Author: yangshengwei):
!image-2021-09-15-15-37-25-561.png|width=521,height=180!



 

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Attachments: image-2021-09-15-15-37-25-561.png, 
> image-2021-09-15-15-39-00-179.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-15 Thread yangshengwei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415358#comment-17415358
 ] 

yangshengwei commented on KAFKA-13301:
--

!image-2021-09-15-15-37-25-561.png|width=521,height=180!



 

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Attachments: image-2021-09-15-15-37-25-561.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-15 Thread yangshengwei (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yangshengwei updated KAFKA-13301:
-
Attachment: image-2021-09-15-15-37-25-561.png

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Attachments: image-2021-09-15-15-37-25-561.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-15 Thread yangshengwei (Jira)
yangshengwei created KAFKA-13301:


 Summary: The relationship between request.timeout. ms and 
max.poll.interval.ms in the Consumer Configs is incorrect.
 Key: KAFKA-13301
 URL: https://issues.apache.org/jira/browse/KAFKA-13301
 Project: Kafka
  Issue Type: Improvement
Reporter: yangshengwei


in Consumer Configs,The value of the configuration max.poll.interval.ms always 
be larger than request.timeout.ms must . But here's what the official document 
says:  The value of the configuration request.timeout.ms must always be larger 
than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13300) Kafka ACL Restriction Group Is not being applied

2021-09-15 Thread Manikumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415352#comment-17415352
 ] 

Manikumar edited comment on KAFKA-13300 at 9/15/21, 7:28 AM:
-

kafka-acls.sh command {{"-add"}} option is for adding an acl and {{"-remove"}} 
is to remove an existing acl. Consuming from a group without read permission 
should fail unless we configure {{"allow.everyone.if.no.acl.found=true"}}
 [https://kafka.apache.org/documentation/#security_authz]

I am not able to reproduce the issue. Can you attach the \{{ server.properties 
file}} and steps to reproduce the issue.


was (Author: omkreddy):
kafka-acls.sh command {{"--add"}} option is for adding an acl and 
{{"--remove"}} is to remove an existing acl. 
 Consuming from a group without read permission should fail unless we configure 
{{"allow.everyone.if.no.acl.found=true"}}
 https://kafka.apache.org/documentation/#security_authz
 
 I am not able to reproduce the issue. Can you attach the{{ server.properties 
file}} and steps to reproduce the issue.

> Kafka ACL Restriction Group Is not being applied
> 
>
> Key: KAFKA-13300
> URL: https://issues.apache.org/jira/browse/KAFKA-13300
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.2
>Reporter: Adriano Jesus
>Priority: Minor
>
> Hi,
> I am creating a KAFKA ACL with a fake group restriction as above:
>  
> {code:java}
> ./kafka-acls.sh \                                                             
>                                                     
>     --authorizer-properties zookeeper.connect=$ZOOKEEPER \
>     --remove --allow-principal User:'Kafka-tools' \
>     --consumer  --group fake-group \
>     --topic delete-me-2
> {code}
>  
> When I try to consume a message with the same user, 'Kafka-tools', and with 
> another group I am still able to consume the messages:
> {code:java}
> // ./kafka-console-consumer.sh --bootstrap-server=$KAFKA --topic delete-me-2 
> --consumer.config user-auth.properties --from-beginning --group teste
> {code}
> According to documentation this property can be used as consumer group 
> ([https://docs.confluent.io/platform/current/kafka/authorization.html):]
> "*Group*
> Groups in the brokers. All protocol calls that work with groups, such as 
> joining a group, must have corresponding privileges with the group in the 
> subject. Group ({{group.id}}) can mean Consumer Group, Stream Group 
> ({{application.id}}), Connect Worker Group, or any other group that uses the 
> Consumer Group protocol, like Schema Registry cluster."
> I did another test adding a consumer act permission with this command:
> {code:java}
> ./kafka-acls.sh \                                                             
>                                                     
>     --authorizer-properties zookeeper.connect=$ZOOKEEPER \
>     --add --allow-principal User:'Kafka-tools' \
>     --consumer  --group fake-group \
>     --topic delete-me-2
> {code}
> After that I removed the ACL authorization to READ operation for Group 
> resource. I tried again to consume from this topic. And still being able to 
> consume message from this topic even though without READ group permission.
> Maybe my interpretation is wrong. But it seens that Kafka ACL is validating 
> the group permissions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13300) Kafka ACL Restriction Group Is not being applied

2021-09-15 Thread Manikumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415352#comment-17415352
 ] 

Manikumar commented on KAFKA-13300:
---

kafka-acls.sh command {{"--add"}} option is for adding an acl and 
{{"--remove"}} is to remove an existing acl. 
 Consuming from a group without read permission should fail unless we configure 
{{"allow.everyone.if.no.acl.found=true"}}
 https://kafka.apache.org/documentation/#security_authz
 
 I am not able to reproduce the issue. Can you attach the{{ server.properties 
file}} and steps to reproduce the issue.

> Kafka ACL Restriction Group Is not being applied
> 
>
> Key: KAFKA-13300
> URL: https://issues.apache.org/jira/browse/KAFKA-13300
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.2
>Reporter: Adriano Jesus
>Priority: Minor
>
> Hi,
> I am creating a KAFKA ACL with a fake group restriction as above:
>  
> {code:java}
> ./kafka-acls.sh \                                                             
>                                                     
>     --authorizer-properties zookeeper.connect=$ZOOKEEPER \
>     --remove --allow-principal User:'Kafka-tools' \
>     --consumer  --group fake-group \
>     --topic delete-me-2
> {code}
>  
> When I try to consume a message with the same user, 'Kafka-tools', and with 
> another group I am still able to consume the messages:
> {code:java}
> // ./kafka-console-consumer.sh --bootstrap-server=$KAFKA --topic delete-me-2 
> --consumer.config user-auth.properties --from-beginning --group teste
> {code}
> According to documentation this property can be used as consumer group 
> ([https://docs.confluent.io/platform/current/kafka/authorization.html):]
> "*Group*
> Groups in the brokers. All protocol calls that work with groups, such as 
> joining a group, must have corresponding privileges with the group in the 
> subject. Group ({{group.id}}) can mean Consumer Group, Stream Group 
> ({{application.id}}), Connect Worker Group, or any other group that uses the 
> Consumer Group protocol, like Schema Registry cluster."
> I did another test adding a consumer act permission with this command:
> {code:java}
> ./kafka-acls.sh \                                                             
>                                                     
>     --authorizer-properties zookeeper.connect=$ZOOKEEPER \
>     --add --allow-principal User:'Kafka-tools' \
>     --consumer  --group fake-group \
>     --topic delete-me-2
> {code}
> After that I removed the ACL authorization to READ operation for Group 
> resource. I tried again to consume from this topic. And still being able to 
> consume message from this topic even though without READ group permission.
> Maybe my interpretation is wrong. But it seens that Kafka ACL is validating 
> the group permissions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-15 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17415337#comment-17415337
 ] 

Victoria Xia commented on KAFKA-13261:
--

Hey [~vvcephei] [~abellemare] [~guozhang] I had a look at the code and it seems 
to support Adam's theory that the custom partitioners from the repartition() 
step aren't taken into account by the foreign key join. In particular, both the 
subscription sink topic and the response sink topic are created without 
partitioners specified in the StreamSinkNode:

[https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1051]
 
[https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1122]

IIUC, this means the default partitioner is used for both topics despite the 
custom partitioners on the source tables, which explains the missing join 
results.

One thing I don't understand: even if we fix this bug by propagating the 
partitioner information from the repartition() step to the foreign key join, 
wouldn't we still have an analogous bug if either of the topics for the source 
tables had custom partitioning logic created from outside Streams (i.e., 
without a repartition() step in the Streams topology)? In this case, Streams 
has no way of determining the partitioning of the source tables, which means we 
need an update to the interface for foreign key joins so that users can specify 
a partitioner to use in order to ensure copartitioning of the subscription and 
response topics with the relevant tables. Is this reasoning sound?

If so, does it make sense to add logic into Streams to propagate information 
about the partitioner from the repartition() step to the foreign key join, or 
would it be better to require users to use the new interface to pass the same 
partitioner from the repartition() step(s) to the foreign key join as well? The 
latter seems more consistent with how copartitioning for joins is typically the 
user's responsibility, and also avoids the need to update Streams with logic 
for tracking partitioners throughout the topology.

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);