[jira] [Updated] (KAFKA-10493) KTable out-of-order updates are not being ignored

2021-04-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10493:

Summary: KTable out-of-order updates are not being ignored  (was: Ktable 
out-of-order updates are not being ignored)

> KTable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 3.0.0
>
> Attachments: KTableOutOfOrderBug.java
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9195) Use Randomized State Directory Names in Streams System Tests

2021-04-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9195:
--

Assignee: (was: Matthias J. Sax)

> Use Randomized State Directory Names in Streams System Tests 
> -
>
> Key: KAFKA-9195
> URL: https://issues.apache.org/jira/browse/KAFKA-9195
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: Bruno Cadonna
>Priority: Major
>
> Currently, the state directory property in streams' system tests is set to 
> the {{PERSISTENT_ROOT}} variable. Since Streams applications in different 
> tests have the same application ID and the state directory path consists of 
> state directory property + application ID + task ID, it might happen that a 
> dirty state directory of one test is re-used by another test if the state 
> directory is not properly cleaned up. This may lead to unexpected results and 
> false positive and/or flaky failures.
> The state directory property shall be set to a randomized path inside 
> {{PERSISTENT_ROOT}} to ensure that tests may not interfere with each other in 
> the case of missing state clean-ups.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #10532: KAFKA-8531: Change default replication factor config

2021-04-12 Thread GitBox


mjsax commented on pull request #10532:
URL: https://github.com/apache/kafka/pull/10532#issuecomment-818450072


   Was also wondering about a potential error message -- not sure atm what 
error message a user would get if they run against 2.3 brokers and if the error 
message would be clear. Should we do anything about it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10532: KAFKA-8531: Change default replication factor config

2021-04-12 Thread GitBox


mjsax commented on pull request #10532:
URL: https://github.com/apache/kafka/pull/10532#issuecomment-818449362


   As test against older broker versions, I started 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4463/ -- 
maybe we need to update some system tests...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10531: KAFKA-12658: Include kafka-shell jar and dependencies in release tar

2021-04-12 Thread GitBox


ijuma commented on pull request #10531:
URL: https://github.com/apache/kafka/pull/10531#issuecomment-818447856


   This change has no impact on tests and the build part of the PR builder had 
succeeded. I went ahead and merged to trunk and 2.8. cc @vvcephei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #10532: KAFKA-8531: Change default replication factor config

2021-04-12 Thread GitBox


mjsax opened a new pull request #10532:
URL: https://github.com/apache/kafka/pull/10532


   Call for review @cadonna @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10531: KAFKA-12658: Include kafka-shell jar and dependencies in release tar

2021-04-12 Thread GitBox


ijuma merged pull request #10531:
URL: https://github.com/apache/kafka/pull/10531


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #10460: KAFKA-10357: Use validate and setup during internal topics creation

2021-04-12 Thread GitBox


rodesai commented on a change in pull request #10460:
URL: https://github.com/apache/kafka/pull/10460#discussion_r612131444



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java
##
@@ -34,83 +40,64 @@
 public class ChangelogTopics {
 
 private final InternalTopicManager internalTopicManager;
-private final Map topicGroups;
-private final Map> tasksForTopicGroup;
+private final ValidationResult validationResult;
+private final Map changelogTopicConfigs;
 private final Map> 
changelogPartitionsForStatefulTask = new HashMap<>();
 private final Map> 
preExistingChangelogPartitionsForTask = new HashMap<>();
 private final Set 
preExistingNonSourceTopicBasedChangelogPartitions = new HashSet<>();
 private final Set sourceTopicBasedChangelogTopics = new 
HashSet<>();
-private final Set 
preExsitingSourceTopicBasedChangelogPartitions = new HashSet<>();
+private final Set 
preExistingSourceTopicBasedChangelogPartitions = new HashSet<>();
 private final Logger log;
 
 public ChangelogTopics(final InternalTopicManager internalTopicManager,
final Map topicGroups,
final Map> tasksForTopicGroup,
final String logPrefix) {
 this.internalTopicManager = internalTopicManager;
-this.topicGroups = topicGroups;
-this.tasksForTopicGroup = tasksForTopicGroup;
 final LogContext logContext = new LogContext(logPrefix);
 log = logContext.logger(getClass());
+changelogTopicConfigs = computeChangelogTopicConfig(topicGroups, 
tasksForTopicGroup);
+validationResult = 
internalTopicManager.validate(changelogTopicConfigs);
 }
 
 public void setup() {
-// add tasks to state change log topic subscribers
-final Map changelogTopicMetadata = new 
HashMap<>();
-for (final Map.Entry entry : 
topicGroups.entrySet()) {
-final int topicGroupId = entry.getKey();
-final TopicsInfo topicsInfo = entry.getValue();
-
-final Set topicGroupTasks = 
tasksForTopicGroup.get(topicGroupId);
-if (topicGroupTasks == null) {
-log.debug("No tasks found for topic group {}", topicGroupId);
-continue;
-} else if (topicsInfo.stateChangelogTopics.isEmpty()) {
-continue;
-}
-
-for (final TaskId task : topicGroupTasks) {
-final Set changelogTopicPartitions = 
topicsInfo.stateChangelogTopics
-.keySet()
-.stream()
-.map(topic -> new TopicPartition(topic, task.partition))
-.collect(Collectors.toSet());
-changelogPartitionsForStatefulTask.put(task, 
changelogTopicPartitions);
-}
-
-for (final InternalTopicConfig topicConfig : 
topicsInfo.nonSourceChangelogTopics()) {
-// the expected number of partitions is the max value of 
TaskId.partition + 1
-int numPartitions = UNKNOWN;
-for (final TaskId task : topicGroupTasks) {
-if (numPartitions < task.partition + 1) {
-numPartitions = task.partition + 1;
-}
-}
-topicConfig.setNumberOfPartitions(numPartitions);
-changelogTopicMetadata.put(topicConfig.name(), topicConfig);
-}
-
sourceTopicBasedChangelogTopics.addAll(topicsInfo.sourceTopicChangelogs());
+if (!validationResult.misconfigurationsForTopics().isEmpty()) {
+throw new 
MisconfiguredInternalTopicException(Utils.join(misconfigured().values().stream()
+.flatMap(Collection::stream).collect(Collectors.toList()), 
Utils.NL)
+);
 }
 
-final Set newlyCreatedChangelogTopics = 
internalTopicManager.makeReady(changelogTopicMetadata);
-log.debug("Created state changelog topics {} from the parsed 
topology.", changelogTopicMetadata.values());
+final Set missingTopics = validationResult.missingTopics();

Review comment:
   So I understand correctly - In a future change we'll actually not create 
the missing topics if not configured to do so?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id

2021-04-12 Thread GitBox


showuon commented on pull request #10525:
URL: https://github.com/apache/kafka/pull/10525#issuecomment-818440268


   OK, let's see what other reviewer's thought. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest

2021-04-12 Thread GitBox


showuon commented on pull request #10409:
URL: https://github.com/apache/kafka/pull/10409#issuecomment-818439327


   @ableegoldman , the failed tests are un-related:
   ```
   Build / ARM / 
org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   ```
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-12 Thread GitBox


hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r612131031



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements 
AdminApiHandler {
+private final LogContext logContext;
+private final Logger log;
+private final Set keys;
+
+public DescribeTransactionsHandler(
+Collection transactionalIds,
+LogContext logContext
+) {
+this.keys = buildKeySet(transactionalIds);
+this.log = logContext.logger(DescribeTransactionsHandler.class);
+this.logContext = logContext;
+}
+
+private static Set buildKeySet(Collection 
transactionalIds) {
+return transactionalIds.stream()
+.map(DescribeTransactionsHandler::asCoordinatorKey)
+.collect(Collectors.toSet());
+}
+
+@Override
+public String apiName() {
+return "describeTransactions";
+}
+
+@Override
+public Keys initializeKeys() {
+return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+}
+
+@Override
+public DescribeTransactionsRequest.Builder buildRequest(
+Integer brokerId,
+Set keys
+) {
+DescribeTransactionsRequestData request = new 
DescribeTransactionsRequestData();
+List transactionalIds = keys.stream().map(key -> 
key.idValue).collect(Collectors.toList());
+request.setTransactionalIds(transactionalIds);
+return new DescribeTransactionsRequest.Builder(request);
+}
+
+@Override
+public ApiResult handleResponse(
+Integer brokerId,
+Set keys,
+AbstractResponse abstractResponse
+) {
+DescribeTransactionsResponse response = (DescribeTransactionsResponse) 
abstractResponse;
+Map completed = new 
HashMap<>();
+Map failed = new HashMap<>();
+List unmapped = new ArrayList<>();
+
+for (DescribeTransactionsResponseData.TransactionState 
transactionState : response.data().transactionStates()) {
+CoordinatorKey transactionalIdKey = 
asCoordinatorKey(transactionState.transactionalId());
+Errors error = Errors.forCode(transactionState.errorCode());
+
+if (error != Errors.NONE) {
+handleError(transactionalIdKey, error, failed, unmapped);
+continue;
+}
+
+OptionalLong transactionStartTimeMs = 
transactionState.transactionStartTimeMs() < 0 ?
+OptionalLong.empty() :
+OptionalLong.of(transactionState.transactionStartTimeMs());
+
+completed.put(transactionalIdKey, new TransactionDescription(
+brokerId,
+TransactionState.parse(transactionState.transactionState()),
+transactionState.producerId(),
+transactionState.producerEpoch(),
+   

[jira] [Commented] (KAFKA-12608) Simple identity pipeline sometimes loses data

2021-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319890#comment-17319890
 ] 

Matthias J. Sax commented on KAFKA-12608:
-

Could you provide the used input data to allow us to reproduce the issue?

> Simple identity pipeline sometimes loses data
> -
>
> Key: KAFKA-12608
> URL: https://issues.apache.org/jira/browse/KAFKA-12608
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
> Environment: 
> https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix
> [nix-shell:~/streaming-consistency/kafka-streams]$ java -version
> openjdk version "1.8.0_265"
> OpenJDK Runtime Environment (build 1.8.0_265-ga)
> OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode)
> [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info
> system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, 
> channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: 
> /nix/var/nix/profiles/per-user/root/channels/nixos 
>Reporter: Jamie Brandon
>Priority: Major
>
> I'm running a very simple streams program that reads records from one topic 
> into a table and then writes the stream back into another topic. In about 1 
> in 5 runs, some of the output records are missing. They tend to form a single 
> contiguous range, as if a single batch was dropped somewhere.
> https://github.com/jamii/streaming-consistency/blob/main/kafka-streams/src/main/java/Demo.java#L49-L52
> {code:bash}
> $ wc -l tmp/*transactions
>  999514 tmp/accepted_transactions
>  100 tmp/transactions
>  1999514 total
> $ cat tmp/transactions | cut -d',' -f 1 | cut -d' ' -f 2 > in
> $ cat tmp/accepted_transactions | cut -d',' -f 1 | cut -d':' -f 2 > out
> $ diff in out | wc -l
>  487
> $ diff in out | head
>  25313,25798d25312
>  < 25312
>  < 25313
>  < 25314
>  < 25315
>  < 25316
>  < 25317
>  < 25318
>  < 25319
>  < 25320
>  
> $ diff in out | tail
>  < 25788
>  < 25789
>  < 25790
>  < 25791
>  < 25792
>  < 25793
>  < 25794
>  < 25795
>  < 25796
>  < 25797
> {code}
> I've checked running the consumer multiple times to make sure that the 
> records are actually missing from the topic and it wasn't just a hiccup in 
> the consumer. 
> The repo linked above has instructions in the readme on how to reproduce the 
> exact versions used. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell

2021-04-12 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12658:

Affects Version/s: (was: 2.8.0)
   (was: 3.0.0)

> bin/kafka-metadata-shell.sh cannot find or load main class 
> org.apache.kafka.shell.MetadataShell
> ---
>
> Key: KAFKA-12658
> URL: https://issues.apache.org/jira/browse/KAFKA-12658
> Project: Kafka
>  Issue Type: Bug
>  Components: core
> Environment: Ubuntu, Java 11
>Reporter: Israel Ekpo
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 2.8.0
>
>
> With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 
> and 2.12 tarballs are not finding the class for the meta data shell from the 
> classpath 
> [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/]
>  
> kafka-run-class.sh is not able to load it.
>  
> cd ../kafka_2.12-2.8.0$
>  
>  bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> cd ../kafka_2.13-2.8.0/
> bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell

2021-04-12 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12658:

Fix Version/s: 2.8.0

> bin/kafka-metadata-shell.sh cannot find or load main class 
> org.apache.kafka.shell.MetadataShell
> ---
>
> Key: KAFKA-12658
> URL: https://issues.apache.org/jira/browse/KAFKA-12658
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.0, 2.8.0
> Environment: Ubuntu, Java 11
>Reporter: Israel Ekpo
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 2.8.0
>
>
> With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 
> and 2.12 tarballs are not finding the class for the meta data shell from the 
> classpath 
> [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/]
>  
> kafka-run-class.sh is not able to load it.
>  
> cd ../kafka_2.12-2.8.0$
>  
>  bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> cd ../kafka_2.13-2.8.0/
> bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #10531: KAFKA-12658: Include kafka-shell jar in release tar

2021-04-12 Thread GitBox


ijuma opened a new pull request #10531:
URL: https://github.com/apache/kafka/pull/10531


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-12 Thread GitBox


mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r612094560



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -118,20 +142,47 @@
 final ProcessorGraphNode otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamProcessorName, 
otherWindowStreamProcessorParams);
 builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode);
 
+Optional, 
LeftOrRightValue>>> outerJoinWindowStore = Optional.empty();
+if (leftOuter || rightOuter) {
+final String outerJoinSuffix = "-shared-outer-join-store";

Review comment:
   Should we use `-shared-left-join-store` and `-shared-right-join-store` 
to left/outer join?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName,
+   
   final JoinWindows windows,
+   
   final 
StreamJoinedInternal streamJoinedInternal) {
+final StoreBuilder, LeftOrRightValue>> builder = new TimeOrderedWindowStoreBuilder, 
LeftOrRightValue>(
+persistentTimeOrderedWindowStore(
+storeName + "-store",
+Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+Duration.ofMillis(windows.size())
+),
+new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+Time.SYSTEM

Review comment:
   Should we pass a `Time` reference here to allow us to mock time in tests 
if necesssary?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -60,20 +82,41 @@
 }
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
+private static final boolean 
DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false;

Review comment:
   Seem overkill to introduce this one? It's used just ones to maybe just 
return `false` where it's used directly?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -93,23 +136,118 @@ public void process(final K key, final V1 value) {
 }
 
 boolean needOuterJoin = outer;
+boolean joinFound = false;
 
 final long inputRecordTimestamp = context().timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
 
+maxObservedStreamTime.advance(inputRecordTimestamp);
+
 try (final WindowStoreIterator iter = otherWindow.fetch(key, 
timeFrom, timeTo)) {
 while (iter.hasNext()) {
 needOuterJoin = false;
+joinFound = true;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Emit expired records before the joined record to keep 
time ordering
+emitExpiredNonJoinedOuterRecordsExcept(key, 
otherRecordTimestamp);
+
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+}
+
+// Emit all expired records before adding a new non-joined 
record to the store. Otherwise,
+// the put() call will advance the stream time, which causes 
records out of the retention
+// period to be deleted, thus not being emitted later.
+if (!joinFound && inputRecordTimestamp == 
maxObservedStreamTime.get()) {
+emitExpiredNonJoinedOuterRecords();
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+

[GitHub] [kafka] predatorray edited a comment on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id

2021-04-12 Thread GitBox


predatorray edited a comment on pull request #10525:
URL: https://github.com/apache/kafka/pull/10525#issuecomment-818424067


   Hi, @showuon. Thanks for your review!
   As per your question, I just follow the suggestion from @viktorsomogyi in 
the previous pr #5858 . I think maybe a subclass of KafkaException will be 
better since it indicates something more like an implementation error. To be 
honest, I cannot find any existing classes that suit this error, so i create a 
new one. If you think it will be better to throw `IllegalArgumentException`, i 
will change the code so we will not have introduce another new KafkaException 
to the code base.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] predatorray commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id

2021-04-12 Thread GitBox


predatorray commented on pull request #10525:
URL: https://github.com/apache/kafka/pull/10525#issuecomment-818424067


   Hi, @showuon. Thanks for your review!
   As per your question, I just follow the suggestion from @viktorsomogyi in 
the previous pr #5858 . I think maybe a subclass of KafkaException will be 
better since it indicates something more like an implementation error. To be 
honest, I cannot find any existing classes that suit this error, so i create a 
new one. If you think it will be better to throw `IllegalArgumentException` is 
better, i will change the code so we will not have introduce another new 
KafkaException to the code base.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10491: MINOR: Switch to using the Gradle RAT plugin

2021-04-12 Thread GitBox


ijuma merged pull request #10491:
URL: https://github.com/apache/kafka/pull/10491


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-12 Thread GitBox


ableegoldman commented on pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#issuecomment-818405468


   > admin.deleteTopic would never return null in production, and thus handling 
null within InternalTopicManager for this case seems not to make sense.
   
   Agreed, making it a strict mock seems sufficient for avoiding the NPE then. 
Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12661) ConfigEntry#equal does not compare other fields when value is NOT null

2021-04-12 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-12661:
---
Priority: Minor  (was: Major)

> ConfigEntry#equal does not compare other fields when value is NOT null 
> ---
>
> Key: KAFKA-12661
> URL: https://issues.apache.org/jira/browse/KAFKA-12661
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> {code:java}
> return this.name.equals(that.name) &&
> this.value != null ? this.value.equals(that.value) : 
> that.value == null &&
> this.isSensitive == that.isSensitive &&
> this.isReadOnly == that.isReadOnly &&
> this.source == that.source &&
> Objects.equals(this.synonyms, that.synonyms);
> {code}
> the second value of ternary operator is "that.value == null &&
> this.isSensitive == that.isSensitive &&
> this.isReadOnly == that.isReadOnly &&
> this.source == that.source &&
> Objects.equals(this.synonyms, that.synonyms);" rather than 
> "that.value == null". Hence, it does not compare other fields when value is 
> not null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr commented on pull request #10526: KAFKA-12655: CVE-2021-28165 - Upgrade jetty to 9.4.39

2021-04-12 Thread GitBox


dongjinleekr commented on pull request #10526:
URL: https://github.com/apache/kafka/pull/10526#issuecomment-818401991


   @omkreddy @edwin092 @showuon Thanks!! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12661) ConfigEntry#equal does not compare other fields when value is NOT null

2021-04-12 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-12661:
--

Assignee: Chia-Ping Tsai

> ConfigEntry#equal does not compare other fields when value is NOT null 
> ---
>
> Key: KAFKA-12661
> URL: https://issues.apache.org/jira/browse/KAFKA-12661
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {code:java}
> return this.name.equals(that.name) &&
> this.value != null ? this.value.equals(that.value) : 
> that.value == null &&
> this.isSensitive == that.isSensitive &&
> this.isReadOnly == that.isReadOnly &&
> this.source == that.source &&
> Objects.equals(this.synonyms, that.synonyms);
> {code}
> the second value of ternary operator is "that.value == null &&
> this.isSensitive == that.isSensitive &&
> this.isReadOnly == that.isReadOnly &&
> this.source == that.source &&
> Objects.equals(this.synonyms, that.synonyms);" rather than 
> "that.value == null". Hence, it does not compare other fields when value is 
> not null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12661) ConfigEntry#equal does not compare other fields when value is NOT null

2021-04-12 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-12661:
--

 Summary: ConfigEntry#equal does not compare other fields when 
value is NOT null 
 Key: KAFKA-12661
 URL: https://issues.apache.org/jira/browse/KAFKA-12661
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai


{code:java}
return this.name.equals(that.name) &&
this.value != null ? this.value.equals(that.value) : that.value 
== null &&
this.isSensitive == that.isSensitive &&
this.isReadOnly == that.isReadOnly &&
this.source == that.source &&
Objects.equals(this.synonyms, that.synonyms);
{code}

the second value of ternary operator is "that.value == null &&
this.isSensitive == that.isSensitive &&
this.isReadOnly == that.isReadOnly &&
this.source == that.source &&
Objects.equals(this.synonyms, that.synonyms);" rather than 
"that.value == null". Hence, it does not compare other fields when value is not 
null.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10446: MINOR: [ConfigEntry.class] add 'type' to 'toString' and 'hashCode'

2021-04-12 Thread GitBox


chia7712 commented on a change in pull request #10446:
URL: https://github.com/apache/kafka/pull/10446#discussion_r612095615



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
##
@@ -149,24 +149,28 @@ public boolean equals(Object o) {
 
 ConfigEntry that = (ConfigEntry) o;
 
-return this.name.equals(that.name) &&
-this.value != null ? this.value.equals(that.value) : 
that.value == null &&

Review comment:
   This was a bug as it evaluated following conditions:
   ```java
   that.value == null &&
   this.isSensitive == that.isSensitive &&
   this.isReadOnly == that.isReadOnly &&
   this.source == that.source &&
   Objects.equals(this.synonyms, that.synonyms);
   ```
   and the expected behavior is `that.value == null`.
   
   There are some tests depending on the "incorrect" equals.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-12 Thread GitBox


mjsax commented on pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#issuecomment-818392640


   Updated the PR.
   
   > (although, why would the max poll interval impact a timeout related to the 
admin client? Or do I not understand where this timeout is ultimately coming 
from)
   
   We use `InternalTopicManager` in the consumer group leader `assign()` 
callback to create internal topics. If we get errors back from admin client, we 
retry. To bound the retries, we use `max.poll.interval.ms / 2` because 
`max.poll.interval.ms` defines an upper bound before `assign()` must finish 
(otherwise the leader gets kicked out of the group). (Cf. KIP-572).
   
   > But we should probably try to actually handle the null even so, and fail 
with a more useful error message in case we ever do hit this again. 
   
   `admin.deleteTopic` would never return `null` in production, and thus 
handling `null` within `InternalTopicManager` for this case seems not to make 
sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-12 Thread GitBox


mjsax commented on a change in pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#discussion_r612089734



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -153,7 +156,8 @@ public void shouldNotCreateTopicsWithEmptyInput() throws 
Exception {
 
 @Test
 public void shouldOnlyRetryNotSuccessfulFuturesDuringSetup() {
-final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final AdminClient admin = EasyMock.createStrictMock(AdminClient.class);
+config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10_000L);

Review comment:
   In test setup, we reduce max.poll to 100ms, to increasing it to 10 sec 
for this test should be sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest

2021-04-12 Thread GitBox


showuon commented on pull request #10409:
URL: https://github.com/apache/kafka/pull/10409#issuecomment-818386632


   @ableegoldman , thank you. I'll monitor the PR build, and update here. You 
can go take a rest, and check it tomorrow (your time). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest

2021-04-12 Thread GitBox


showuon commented on pull request #10409:
URL: https://github.com/apache/kafka/pull/10409#issuecomment-818378362


   @ableegoldman , I updated the PR to wait until all streams reach "RUNNING" 
state before consuming records. Let's wait and see the test results. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell

2021-04-12 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-12658:
---

Assignee: Ismael Juma

> bin/kafka-metadata-shell.sh cannot find or load main class 
> org.apache.kafka.shell.MetadataShell
> ---
>
> Key: KAFKA-12658
> URL: https://issues.apache.org/jira/browse/KAFKA-12658
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.0, 2.8.0
> Environment: Ubuntu, Java 11
>Reporter: Israel Ekpo
>Assignee: Ismael Juma
>Priority: Blocker
>
> With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 
> and 2.12 tarballs are not finding the class for the meta data shell from the 
> classpath 
> [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/]
>  
> kafka-run-class.sh is not able to load it.
>  
> cd ../kafka_2.12-2.8.0$
>  
>  bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> cd ../kafka_2.13-2.8.0/
> bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12660) Do not update offset commit sensor after append failure

2021-04-12 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12660:
---

 Summary: Do not update offset commit sensor after append failure
 Key: KAFKA-12660
 URL: https://issues.apache.org/jira/browse/KAFKA-12660
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


In the append callback after writing an offset to the log in 
`GroupMetadataManager`, It seems wrong to update the offset commit sensor prior 
to checking for errors: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L394.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API

2021-04-12 Thread GitBox


dengziming commented on pull request #10275:
URL: https://github.com/apache/kafka/pull/10275#issuecomment-818372912


   > Merging to trunk. @chia7712 Yeah, we should create some jiras. I was going 
to wait until the other use cases in KIP-664 had been fleshed out, but I guess 
there's no harm converting other APIs. The ListOffsets API would be a good 
place to start. I'll create a JIRA tomorrow if you don't get there first.
   
   I created a JIRA and a PR #10467 since you haven't done this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-12 Thread GitBox


ableegoldman commented on pull request #10529:
URL: https://github.com/apache/kafka/pull/10529#issuecomment-818368666


   Personally I would slightly prefer to just increase the max.poll.interval if 
that's what defines the timeout (although, why would the max poll interval 
impact a timeout related to the admin client? Or do I not understand where this 
timeout is ultimately coming from)
   
   But we should probably try to actually handle the `null` even so, and fail 
with a more useful error message in case we ever do hit this again. Or if we 
can't handle the null directly, maybe use a not-nice mock so it'll fail rather 
than just returning null and hitting an NPE


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12637) Remove deprecated PartitionAssignor interface

2021-04-12 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12637.

Resolution: Fixed

> Remove deprecated PartitionAssignor interface
> -
>
> Key: KAFKA-12637
> URL: https://issues.apache.org/jira/browse/KAFKA-12637
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: dengziming
>Priority: Blocker
>  Labels: newbie, newbie++
> Fix For: 3.0.0
>
>
> In KIP-429, we deprecated the existing PartitionAssignor interface in order 
> to move it out of the internals package and better align the name with other 
> pluggable Consumer interfaces. We added an adapter to convert from existing 
> o.a.k.clients.consumer.internals.PartitionAssignor to the new 
> o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated 
> interface. This was deprecated in 2.4, so we should be ok to remove it and 
> the PartitionAssignorAdaptor in 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-12 Thread GitBox


ableegoldman merged pull request #10512:
URL: https://github.com/apache/kafka/pull/10512


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-12 Thread GitBox


ableegoldman commented on pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#issuecomment-818367236


   Merged to trunk -- thanks @dengziming !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-12 Thread GitBox


ableegoldman commented on pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#issuecomment-818366846


   Well there were a large number of failures, but all of them are unrelated. 
Left a comment/reopened the ticket for all of the following:
   
   
kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
 
   kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
 
   
kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
   
kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector
 kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   
.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   
kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2021-04-12 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-8391:
---

[~rhauch] [~kkonstantine] this test failed again -- based on the error message 
it looks like it may be a "real" failure this time, not environmental.

Stacktrace
java.lang.AssertionError: Tasks are imbalanced: 
localhost:35163=[seq-source11-0, seq-source11-3, seq-source10-1, seq-source12-1]
localhost:36961=[seq-source11-1, seq-source10-2, seq-source12-2]
localhost:39023=[seq-source11-2, seq-source10-0, seq-source10-3, 
seq-source12-0, seq-source12-3]
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.assertTrue(Assert.java:42)
at 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:365)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:319)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:367)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:316)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
at 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:213)


https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10512/5/testReport/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/Build___JDK_15_and_Scala_2_13___testDeleteConnector/

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Randall Hauch
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12659) Mirrormaker 2 - seeking to wrong offsets on restart

2021-04-12 Thread stuart (Jira)
stuart created KAFKA-12659:
--

 Summary: Mirrormaker 2 - seeking to wrong offsets on restart
 Key: KAFKA-12659
 URL: https://issues.apache.org/jira/browse/KAFKA-12659
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.7.0
 Environment: Docker container based on openjdk11:alpine-slim , running 
on Amazon ECS
Reporter: stuart
 Attachments: partitions.png

We are running a dedicated mirror maker 2 cluster with three tasks, and have 
been trialing it for a few weeks on a single topic. It's been going fine, so we 
attempted to add a second topic, changing the MM2 config file from 

topics = sports

to 

topics = sports|translations 

 

We noticed the following day that the replication of the new topic was not 
working, and reading online it seems others have had similar issues, perhaps 
related to the config stored in the internal mm2-configs topic not refreshing 
from the file, so following  recommendations in that thread we stopped the 
tasks for 10 minutes, and eventually it started replicating.

However we also noticed later that MM2 had started re-replicating about 5 
million records from earlier that day (from the original topic) which was 
concerning. A few hours later I restarted the MM2 tasks and the same thing 
happened, it started re-replicating the same old messages.

Looking into the mm2-offsets-\{source}.internal topic I could see that the 
records which track offsets switched partitions, for example the records for 
sports-7 topic-partition went from being written to partition 5 (in 
mm2-offsets) to partition 8. The same occurred for other partitions (most but 
not all)

Following the task restarts in the MM2 logs I can see that MM2 is always 
Seeking to offset 42741034 for sports-7, this value matches the oldest offset 
record on mm2-offsets partition 5, so it looks like MM2 is ignoring the more 
recent offset records on partition 8 and so not seeking to the correct latest 
offsets.

And this also appears to affect compaction of the offsets internal topic, as 
while the older records on partition 8 for the sports-7 key are being cleaned 
up, the even older records for that same key on partition 5 are not.

I cant be certain that introducing the second topic into MM2 config was the 
trigger for that partitioning behaviour change, I am not sure why it would 
unless adding more topics to the topic replication list caused MM2 to 
automatically scale the number of partitions on the 
mm2-offsets-\{source}.internal topic, which I guess might affect partitioning 
behaviour. It was the only noteworthy thing that we consciously changed within 
the same rough timeframe however.

Attached is a screenshot to try and help illustrate the issue

 

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication

2021-04-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319797#comment-17319797
 ] 

A. Sophie Blee-Goldman commented on KAFKA-9013:
---

Failed again: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10512/5/testReport/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplication__/

> Flaky Test MirrorConnectorsIntegrationTest#testReplication
> --
>
> Key: KAFKA-9013
> URL: https://issues.apache.org/jira/browse/KAFKA-9013
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {code:java}
> java.lang.AssertionError: Condition not met within timeout 2. Offsets not 
> translated downstream to primary cluster.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354)
>   at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239)
> {code}
> h1. Standard Error
> {code}
> Standard Error
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> 

[jira] [Reopened] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync

2021-04-12 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-12284:

  Assignee: (was: Luke Chen)

Failed again, on both the SSL and plain version of this test:

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10512/5/testReport/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationSSLTest/Build___JDK_15_and_Scala_2_13___testOneWayReplicationWithAutoOffsetSync__/

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10512/5/testReport/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTest/Build___JDK_15_and_Scala_2_13___testOneWayReplicationWithAutoOffsetSync__/
 
java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365)
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609)

> Flaky Test 
> MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
> -
>
> Key: KAFKA-12284
> URL: https://issues.apache.org/jira/browse/KAFKA-12284
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470]
> {quote} {{java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}}
> [...]
>  
> {{Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364)
>   ... 92 more
> Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.}}
> {quote}
> STDOUT
> {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats:  (org.apache.kafka.connect.runtime.WorkerSourceTask:354)
> org.apache.kafka.common.KafkaException: Producer is closed forcefully.
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282)
>   at java.lang.Thread.run(Thread.java:748)}}{quote}
> {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state 
> info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"}
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> 

[jira] [Commented] (KAFKA-12629) Flaky Test RaftClusterTest

2021-04-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319794#comment-17319794
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12629:


I'm seeing at least one failure in RaftClusterTest on almost every PR build 
lately.

Looks like it's typically the same TimeoutException mentioned above. Seen on 
both testCreateClusterAndCreateAndManyTopics() and 
testCreateClusterAndCreateAndManyTopicsWithManyPartitions()


> Flaky Test RaftClusterTest
> --
>
> Key: KAFKA-12629
> URL: https://issues.apache.org/jira/browse/KAFKA-12629
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote} {{java.util.concurrent.ExecutionException: 
> java.lang.ClassNotFoundException: 
> org.apache.kafka.controller.NoOpSnapshotWriterBuilder
>   at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell

2021-04-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12658:
-
Priority: Blocker  (was: Major)

> bin/kafka-metadata-shell.sh cannot find or load main class 
> org.apache.kafka.shell.MetadataShell
> ---
>
> Key: KAFKA-12658
> URL: https://issues.apache.org/jira/browse/KAFKA-12658
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.0, 2.8.0
> Environment: Ubuntu, Java 11
>Reporter: Israel Ekpo
>Priority: Blocker
>
> With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 
> and 2.12 tarballs are not finding the class for the meta data shell from the 
> classpath 
> [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/]
>  
> kafka-run-class.sh is not able to load it.
>  
> cd ../kafka_2.12-2.8.0$
>  
>  bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> cd ../kafka_2.13-2.8.0/
> bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell

2021-04-12 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-12658:


Assignee: (was: John Roesler)

> bin/kafka-metadata-shell.sh cannot find or load main class 
> org.apache.kafka.shell.MetadataShell
> ---
>
> Key: KAFKA-12658
> URL: https://issues.apache.org/jira/browse/KAFKA-12658
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.0, 2.8.0
> Environment: Ubuntu, Java 11
>Reporter: Israel Ekpo
>Priority: Major
>
> With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 
> and 2.12 tarballs are not finding the class for the meta data shell from the 
> classpath 
> [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/]
>  
> kafka-run-class.sh is not able to load it.
>  
> cd ../kafka_2.12-2.8.0$
>  
>  bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> cd ../kafka_2.13-2.8.0/
> bin/kafka-metadata-shell.sh --help
> Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.shell.MetadataShell
> !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell

2021-04-12 Thread Israel Ekpo (Jira)
Israel Ekpo created KAFKA-12658:
---

 Summary: bin/kafka-metadata-shell.sh cannot find or load main 
class org.apache.kafka.shell.MetadataShell
 Key: KAFKA-12658
 URL: https://issues.apache.org/jira/browse/KAFKA-12658
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.0.0, 2.8.0
 Environment: Ubuntu, Java 11
Reporter: Israel Ekpo
Assignee: John Roesler


With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 
and 2.12 tarballs are not finding the class for the meta data shell from the 
classpath 
[https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/]
 
kafka-run-class.sh is not able to load it.
 
cd ../kafka_2.12-2.8.0$
 
 bin/kafka-metadata-shell.sh --help
Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.shell.MetadataShell

cd ../kafka_2.13-2.8.0/


bin/kafka-metadata-shell.sh --help
Error: Could not find or load main class org.apache.kafka.shell.MetadataShell
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.shell.MetadataShell
!https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif!
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #10411: KAFKA-7606: Remove deprecated options from StreamsResetter

2021-04-12 Thread GitBox


ableegoldman merged pull request #10411:
URL: https://github.com/apache/kafka/pull/10411


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala

2021-04-12 Thread GitBox


mumrah commented on a change in pull request #10505:
URL: https://github.com/apache/kafka/pull/10505#discussion_r612039100



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -343,21 +366,22 @@ class ControllerApis(val requestChannel: RequestChannel,
 iterator.remove()
   }
 }
-val response = controller.createTopics(effectiveRequest).get()
-duplicateTopicNames.forEach { name =>
-  response.topics().add(new CreatableTopicResult().
-setName(name).
-setErrorCode(INVALID_REQUEST.code()).
-setErrorMessage("Found multiple entries for this topic."))
-}
-topicNames.forEach { name =>
-  if (!authorizedTopicNames.contains(name)) {
+controller.createTopics(effectiveRequest).thenApply(response => {
+  duplicateTopicNames.forEach { name =>
 response.topics().add(new CreatableTopicResult().
   setName(name).
-  setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+  setErrorCode(INVALID_REQUEST.code()).
+  setErrorMessage("Found multiple entries for this topic."))
   }
-}
-response
+  topicNames.forEach { name =>
+if (!authorizedTopicNames.contains(name)) {
+  response.topics().add(new CreatableTopicResult().
+setName(name).
+setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))

Review comment:
   nit: here and other places, we don't need parens for `code()`

##
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##
@@ -151,6 +221,73 @@ class ControllerApisTest {
   brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testUnauthorizedHandleAlterClientQuotas(): Unit = {
+assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
+  Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
+handleAlterClientQuotas(buildRequest(new AlterClientQuotasRequest(
+  new AlterClientQuotasRequestData(), 0
+  }
+
+  @Test
+  def testUnauthorizedHandleIncrementalAlterConfigs(): Unit = {
+val requestData = new IncrementalAlterConfigsRequestData().setResources(
+  new AlterConfigsResourceCollection(
+util.Arrays.asList(new 
IncrementalAlterConfigsRequestData.AlterConfigsResource().
+  setResourceName("1").
+  setResourceType(ConfigResource.Type.BROKER.id()).
+  setConfigs(new AlterableConfigCollection(util.Arrays.asList(new 
AlterableConfig().
+setName("log.cleaner.backoff.ms").

Review comment:
   Should we use the static KafkaConfig property definitions instead of 
these strings?

##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -35,8 +36,8 @@ import org.apache.kafka.common.internals.FatalExitError
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, 
DeletableTopicResultCollection}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
-import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, 
BrokerHeartbeatResponseData, BrokerRegistrationResponseData, 
CreateTopicsRequestData, CreateTopicsResponseData, DeleteTopicsRequestData, 
DeleteTopicsResponseData, DescribeQuorumResponseData, 
EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, 
SaslAuthenticateResponseData, SaslHandshakeResponseData, 
UnregisterBrokerResponseData, VoteResponseData}
-import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, 
TOPIC_AUTHORIZATION_FAILED}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors._

Review comment:
   In KafkaApis we import `Errors` rather than importing all the members of 
the enum via a wildcard import. Any reason to prefer one way over the other? It 
seems more common in our code base to import the enum and refer to members like 
`Errors.ILLEGAL_SASL_STATE`

##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -238,87 +249,99 @@ class ControllerApis(val requestChannel: RequestChannel,
 val toAuthenticate = new util.HashSet[String]
 toAuthenticate.addAll(providedNames)
 val idToName = new util.HashMap[Uuid, String]
-controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) =>
-  if (nameOrError.isError) {
-appendResponse(null, id, nameOrError.error())
-  } else {
-toAuthenticate.add(nameOrError.result())
-idToName.put(id, nameOrError.result())
-  }
-}
-// Get the list of deletable topics (those we can delete) and the list of 
describeable
-// topics.  If a topic can't be deleted or described, we have to act like 
it doesn't
-// exist, even when it does.
-val topicsToAuthenticate = toAuthenticate.asScala
-val (describeable, deletable) = if (hasClusterAuth) 

[GitHub] [kafka] showuon commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest

2021-04-12 Thread GitBox


showuon commented on pull request #10409:
URL: https://github.com/apache/kafka/pull/10409#issuecomment-818309573


   Sure, I'll update the PR later. I removed it because I tried before and it 
doesn't work well. Let's try it again 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12643) Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in transform/process (this.context.schedule function)

2021-04-12 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-12643.
---
Resolution: Duplicate

Thanks for confirming!

> Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in 
> transform/process (this.context.schedule function)
> -
>
> Key: KAFKA-12643
> URL: https://issues.apache.org/jira/browse/KAFKA-12643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: David EVANO
>Priority: Major
> Attachments: Capture d’écran 2021-04-09 à 17.50.05.png
>
>
> During a tranform() or a process() method:
> Define a schedule tyask:
> this.context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, 
> timestamp -> \{...}
> store.put(...) or context.forward(...) produce a record with an invalid 
> timestamp.
> For the forward, a workaround is define the timestamp:
> context.forward(entry.key, entry.value.toString(), 
> To.all().withTimestamp(timestamp));
> But for state.put(...) or state.delete(...) functions there is no workaround.
> Is it mandatory to have the Kafka broker version aligned with the Kafka 
> Streams version?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12642) Improve Rebalance reason upon metadata change

2021-04-12 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319766#comment-17319766
 ] 

Guozhang Wang commented on KAFKA-12642:
---

Hi [~nicolas.guyomar] Maybe in the existing log message, we can print the 
`protocols` and `supportedProtocols` of the member, in the form of ? If you agree, please feel free to submit a PR.

> Improve Rebalance reason upon metadata change
> -
>
> Key: KAFKA-12642
> URL: https://issues.apache.org/jira/browse/KAFKA-12642
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Nicolas Guyomar
>Priority: Minor
>
> Whenever the known member metadata does not match anymore the one from a 
> JoinGroupRequest, the GroupCoordinator triggers a rebalance with the 
> following reason  "Updating metadata for member ${member.memberId}"  but  
> there 2 underlying reasons from that part of the code in MemberMetadata.scala 
> : 
> {code:java}
> def matches(protocols: List[(String, Array[Byte])]): Boolean = {
>   if (protocols.size != this.supportedProtocols.size)
> return false
>   for (i <- protocols.indices) {
> val p1 = protocols(i)
> val p2 = supportedProtocols(i)
> if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
>   return false
>   }
>   true
> }{code}
> Could we improve the Rebalance Reason with a bit more detail maybe ? 
>  
> Thank you
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork opened a new pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…

2021-04-12 Thread GitBox


kpatelatwork opened a new pull request #10530:
URL: https://github.com/apache/kafka/pull/10530


   …ertised uri is invalid and other nodes can't reach it. Node names have 
rules about what characters they can have and maximum length like in RFC-1123. 
The node-node communication over REST API won't happen if this node's 
advertised URL to the cluster has an invalid host name, and the error message 
in logs isn't very helpful. 
   
   This PR adds a new behavior by using the java IDN class to expose the 
detailed error message and fails the server bootstrap. 
   
   @C0urante , @rhauch and @kkonstantine  please review
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-8605) Warn users when they have same connector in their plugin-path more than once

2021-04-12 Thread Kalpesh Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kalpesh Patel reassigned KAFKA-8605:


Assignee: Kalpesh Patel

> Warn users when they have same connector in their plugin-path more than once
> 
>
> Key: KAFKA-8605
> URL: https://issues.apache.org/jira/browse/KAFKA-8605
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Cyrus Vafadari
>Assignee: Kalpesh Patel
>Priority: Major
>
> Right now it is very easy to have multiple copies of the same connector in 
> the plugin-path and not realize it.
> This can be problematic if a user is adding dependencies into the plugin, or 
> accidentally using the wrong version of the connector.
> An unintrusive improvement would be to log a warning if the same connector 
> appears in the plugin-path more than once



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #10506: MINOR: Improve description of `max.poll.records` config

2021-04-12 Thread GitBox


mjsax commented on pull request #10506:
URL: https://github.com/apache/kafka/pull/10506#issuecomment-818278763


   Merged to `trunk` and cherry-picked to `2.8` and `2.7` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12643) Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in transform/process (this.context.schedule function)

2021-04-12 Thread David EVANO (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319735#comment-17319735
 ] 

David EVANO commented on KAFKA-12643:
-

Hello,
Yes, this seems to be a duplicate of the mentioned issue .
Thanks
David Evano

Le ven. 9 avr. 2021 à 18:21, Guozhang Wang (Jira)  a



> Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in 
> transform/process (this.context.schedule function)
> -
>
> Key: KAFKA-12643
> URL: https://issues.apache.org/jira/browse/KAFKA-12643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: David EVANO
>Priority: Major
> Attachments: Capture d’écran 2021-04-09 à 17.50.05.png
>
>
> During a tranform() or a process() method:
> Define a schedule tyask:
> this.context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, 
> timestamp -> \{...}
> store.put(...) or context.forward(...) produce a record with an invalid 
> timestamp.
> For the forward, a workaround is define the timestamp:
> context.forward(entry.key, entry.value.toString(), 
> To.all().withTimestamp(timestamp));
> But for state.put(...) or state.delete(...) functions there is no workaround.
> Is it mandatory to have the Kafka broker version aligned with the Kafka 
> Streams version?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611957903



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;

Review comment:
   Okay. This is an internal API so I say we add this when needed. If we 
still want to refactor then, this is similar to `maybeCompleteDrain` but it 
cannot assume that the lock is held. How about naming this method 
`completeDrain`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10506: MINOR: Improve description of `max.poll.records` config

2021-04-12 Thread GitBox


mjsax merged pull request #10506:
URL: https://github.com/apache/kafka/pull/10506


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12657) Flaky Tests BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop

2021-04-12 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-12657:
---

 Summary: Flaky Tests 
BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop
 Key: KAFKA-12657
 URL: https://issues.apache.org/jira/browse/KAFKA-12657
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Matthias J. Sax


[https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327377745]
{quote} {{org.opentest4j.AssertionFailedError: Condition not met within timeout 
6. Worker did not complete startup in time ==> expected:  but was: 

at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:319)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:367)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:316)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
at 
org.apache.kafka.connect.integration.BlockingConnectorTest.setup(BlockingConnectorTest.java:133)}}
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication

2021-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319715#comment-17319715
 ] 

Matthias J. Sax edited comment on KAFKA-12566 at 4/12/21, 8:59 PM:
---

[https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327349920]

[https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327310946] 
{quote} {{java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
 at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365)
 at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340)
 at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609)
 at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:173)
 at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:63)}}
{quote}


was (Author: mjsax):
[https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327349920]
{quote} {{java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365)
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:173)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:63)}}{quote}

> Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
> -
>
> Key: KAFKA-12566
> URL: https://issues.apache.org/jira/browse/KAFKA-12566
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 2. 
> Offsets not translated downstream to primary cluster. ==> expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289)
> {code}
> {{LOGs}}
> {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>  at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> 

[jira] [Commented] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication

2021-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319715#comment-17319715
 ] 

Matthias J. Sax commented on KAFKA-12566:
-

[https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327349920]
{quote} {{java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365)
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:173)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:63)}}{quote}

> Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
> -
>
> Key: KAFKA-12566
> URL: https://issues.apache.org/jira/browse/KAFKA-12566
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 2. 
> Offsets not translated downstream to primary cluster. ==> expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289)
> {code}
> {{LOGs}}
> {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>  at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote}
> and
> {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] 
> Graceful stop of task MirrorHeartbeatConnector-0 failed. 
> (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) 
> org.apache.kafka.common.KafkaException: Producer is closed forcefully. at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>  at 
> 

[GitHub] [kafka] mjsax commented on pull request #10493: MINOR: cleanup Jenkins workspace before build

2021-04-12 Thread GitBox


mjsax commented on pull request #10493:
URL: https://github.com/apache/kafka/pull/10493#issuecomment-818144809


   Thanks @ijuma!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax closed pull request #10493: MINOR: cleanup Jenkins workspace before build

2021-04-12 Thread GitBox


mjsax closed pull request #10493:
URL: https://github.com/apache/kafka/pull/10493


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-12 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611915606



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+store.put(otherJoinKey, null, 
otherRecordTimestamp);
+}
+});
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (timeTo < maxStreamTime) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(thisJoin, key),
+makeValueOrOtherValue(thisJoin, value),
+inputRecordTimestamp));
+}
+}
+
+outerJoinWindowStore.ifPresent(store -> {
+// only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+// if the current record is late, then there is no need to 
check for expired records
+if (inputRecordTimestamp == maxStreamTime) {
+maybeEmitOuterExpiryRecords(store, maxStreamTime);
+}
+});
+}
+}
+
+private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-12 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611915696



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {
+store.put(otherJoinKey, null, 
otherRecordTimestamp);
+}
+});
 }
 
 if (needOuterJoin) {
-context().forward(key, joiner.apply(key, value, null));
+// The maxStreamTime contains the max time observed in 
both sides of the join.
+// Having access to the time observed in the other join 
side fixes the following
+// problem:
+//
+// Say we have a window size of 5 seconds
+//  1. A non-joined record wth time T10 is seen in the 
left-topic (maxLeftStreamTime: 10)
+// The record is not processed yet, and is added to 
the outer-join store
+//  2. A non-joined record with time T2 is seen in the 
right-topic (maxRightStreamTime: 2)
+// The record is not processed yet, and is added to 
the outer-join store
+//  3. A joined record with time T11 is seen in the 
left-topic (maxLeftStreamTime: 11)
+// It is time to look at the expired records. T10 and 
T2 should be emitted, but
+// because T2 was late, then it is not fetched by the 
window store, so it is not processed
+//
+// See KStreamKStreamLeftJoinTest.testLowerWindowBound() 
tests
+//
+// the condition below allows us to process the late 
record without the need
+// to hold it in the temporary outer store
+if (timeTo < maxStreamTime) {
+context().forward(key, joiner.apply(key, value, null));
+} else {
+outerJoinWindowStore.ifPresent(store -> store.put(
+KeyAndJoinSide.make(thisJoin, key),
+makeValueOrOtherValue(thisJoin, value),
+inputRecordTimestamp));
+}
+}
+
+outerJoinWindowStore.ifPresent(store -> {
+// only emit left/outer non-joined if the stream time has 
advanced (inputRecordTime = maxStreamTime)
+// if the current record is late, then there is no need to 
check for expired records
+if (inputRecordTimestamp == maxStreamTime) {
+maybeEmitOuterExpiryRecords(store, maxStreamTime);
+}
+});
+}
+}
+
+private ValueOrOtherValue makeValueOrOtherValue(final boolean 
thisJoin, final V1 value) {
+return thisJoin
+? ValueOrOtherValue.makeValue(value)
+: ValueOrOtherValue.makeOtherValue(value);
+}
+
+@SuppressWarnings("unchecked")
+private void maybeEmitOuterExpiryRecords(final 
WindowStore, ValueOrOtherValue> store, final long 
maxStreamTime) {
+try (final KeyValueIterator>, 
ValueOrOtherValue> it = store.all()) {
+while (it.hasNext()) {
+final KeyValue>, 
ValueOrOtherValue> e = it.next();
+
+// Skip next records if the oldest record has not expired 
yet
+if (e.key.window().end() + joinGraceMs >= maxStreamTime) {
+break;
+}
+
+final K key = e.key.key().getKey();
+
+// Emit the record by joining with a null value. But the 
order varies depending whether
+// this join is using a reverse joiner or not. Also 

[jira] [Commented] (KAFKA-12654) separate checkAllSubscriptionEqual and getConsumerToOwnedPartitions methods

2021-04-12 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319675#comment-17319675
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12654:


To be honest, the sticky assignment algorithm for the general case is so slow 
that the additional time spent populating this map is probably negligible. That 
said, the general algorithm ends up populating exactly the same map in 
#prepopulateCurrentAssignments, so we could just pass it along to the 
generalAssign as well. But it also has to populate an additional map 
(prevAssignment) so it would still need to loop over and deserialize this info 
again anyways

> separate checkAllSubscriptionEqual and getConsumerToOwnedPartitions methods
> ---
>
> Key: KAFKA-12654
> URL: https://issues.apache.org/jira/browse/KAFKA-12654
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Currently, when entering sticky assignor, we'll check if all consumers have 
> the same subscription to decide which assignor to use (constrained assignor 
> or general assignor). While checking the subscription, we also deserialize 
> the subscription user data to get the partitions owned by the consumer 
> (consumerToOwnedPartitions). However, the consumerToOwnedPartitions info is 
> not used in general assignor (we'll actually deserialize it inside general 
> assignor), so, we don't need to deserialize data for general assignor again. 
> We should separate these 2 things into 2 methods, and only deserialize the 
> user data for constrained assignor, to improve the general sticky assignor 
> performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest

2021-04-12 Thread GitBox


ableegoldman commented on pull request #10409:
URL: https://github.com/apache/kafka/pull/10409#issuecomment-818077023


   It looks like the test failed in this most recent run -- however that's not 
unexpected since I think you may have accidentally taken out the other fix 
which probably _will_ help: to use `startStreamsAndWaitForRunning` so we make 
sure to wait for the KafkaStreams to start up and get into RUNNING. Let's add 
that back and see if it helps (I suspect it will)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface

2021-04-12 Thread GitBox


ableegoldman commented on pull request #10512:
URL: https://github.com/apache/kafka/pull/10512#issuecomment-818074480


   Hm, the build was aborted -- looks like the gradle daemon was killed or 
crashed for some reason? I'll kick off a new run, let's hope this one passes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-12 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611882455



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -102,14 +127,98 @@ public void process(final K key, final V1 value) {
 while (iter.hasNext()) {
 needOuterJoin = false;
 final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
 context().forward(
 key,
 joiner.apply(key, value, otherRecord.value),
-To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecord.key)));
+To.all().withTimestamp(Math.max(inputRecordTimestamp, 
otherRecordTimestamp)));
+
+outerJoinWindowStore.ifPresent(store -> {
+// Delete the other joined key from the outer 
non-joined store now to prevent
+// further processing
+final KeyAndJoinSide otherJoinKey = 
KeyAndJoinSide.make(!thisJoin, key);
+if (store.fetch(otherJoinKey, otherRecordTimestamp) != 
null) {

Review comment:
   This needs to be supported in Window stores. I might need to write a KIP 
for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-12 Thread GitBox


spena commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r611878043



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final 
WindowBytesStoreSupplier supplier,
 return builder;
 }
 
+@SuppressWarnings("unchecked")
+private static  StoreBuilder, 
ValueOrOtherValue>> outerJoinWindowStoreBuilder(final String storeName,
+   
final JoinWindows windows,
+   
final 
StreamJoinedInternal streamJoinedInternal) {
+final StoreBuilder, 
ValueOrOtherValue>> builder = new 
TimeOrderedWindowStoreBuilder, ValueOrOtherValue>(
+persistentTimeOrderedWindowStore(
+storeName + "-store",
+Duration.ofMillis(windows.size() + windows.gracePeriodMs()),
+Duration.ofMillis(windows.size())
+),
+new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()),
+new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), 
streamJoinedInternal.otherValueSerde()),
+Time.SYSTEM
+);
+if (streamJoinedInternal.loggingEnabled()) {
+builder.withLoggingEnabled(streamJoinedInternal.logConfig());
+} else {
+builder.withLoggingDisabled();
+}
+
+return builder;
+}
+
+// This method has same code as Store.persistentWindowStore(). But 
TimeOrderedWindowStore is
+// a non-public API, so we need to keep duplicate code until it becomes 
public.
+private static WindowBytesStoreSupplier 
persistentTimeOrderedWindowStore(final String storeName,
+ 
final Duration retentionPeriod,
+ 
final Duration windowSize) {
+Objects.requireNonNull(storeName, "name cannot be null");
+final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+final long segmentInterval = Math.max(retentionMs / 2, 60_000L);
+
+if (retentionMs < 0L) {
+throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+}
+if (windowSizeMs < 0L) {
+throw new IllegalArgumentException("windowSize cannot be 
negative");
+}
+if (segmentInterval < 1L) {
+throw new IllegalArgumentException("segmentInterval cannot be zero 
or negative");
+}
+if (windowSizeMs > retentionMs) {
+throw new IllegalArgumentException("The retention period of the 
window store "
++ storeName + " must be no smaller than its window size. Got 
size=["
++ windowSizeMs + "], retention=[" + retentionMs + "]");
+}
+
+return new RocksDbWindowBytesStoreSupplier(
+storeName,
+retentionMs,
+segmentInterval,
+windowSizeMs,
+false,

Review comment:
   I had issues with duplicates, and forgot to investigate about it. I just 
did another round of investigation, but I still get issues with it. The problem 
is I cannot delete any key when duplicates are used. This happens in any window 
store, not just the time-ordered window store.
   
   The problem I found is:
   
   1. Added two duplicates with key = 0 and time = 0
   ```
   # this adds a key with seqNum = 0
   put(0, "A0", 0) 
   # this adds a key with seqNum = 1
   put(0, "A0-0", 0)
   ```
   2. Delete key = 0 and time = 0
   ```
   # this attempts to delete with seqNum = 2, which it does not exist
   put(0, null, 0)
   ```
   
   Initially I didn't think using duplicates were necessary, but I just wrote a 
test case with the old semantics and duplicates are processed, so I need to 
support it. Do you know if deleting duplicates was unsupported all the time? or 
am I missing some API or workaround?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12656) JMX exporter is leaking a lot of file descriptors

2021-04-12 Thread Liang Xia (Jira)
Liang Xia created KAFKA-12656:
-

 Summary: JMX exporter is leaking a lot of file descriptors
 Key: KAFKA-12656
 URL: https://issues.apache.org/jira/browse/KAFKA-12656
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Liang Xia


jmx exporter doesn't close the connections successfuly after reporting the 
metrics.

They are stuck in CLOSE_WAIT state.
java2351 kcbq *385u IPv63660408   0t0  TCP 
example.internal:9404->x.x.x.x:39470 (CLOSE_WAIT)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611863824



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -194,14 +196,45 @@ private void completeCurrentBatch() {
 MemoryRecords data = currentBatch.build();
 completed.add(new CompletedBatch<>(
 currentBatch.baseOffset(),
-currentBatch.records(),
+Optional.of(currentBatch.records()),
 data,
 memoryPool,
 currentBatch.initialBuffer()
 ));
 currentBatch = null;
 }
 
+public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+maybeCompleteDrain();
+ByteBuffer buffer = memoryPool.tryAllocate(256);
+if (buffer != null) {
+MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
+this.nextOffset, 
+currentTimeMs, 
+this.epoch, 
+buffer, 
+leaderChangeMessage);
+completed.add(new CompletedBatch<>(
+nextOffset,
+Optional.empty(),
+data,
+memoryPool,
+buffer
+));
+nextOffset += 1;
+}
+}
+
+public void flush() {

Review comment:
   @jsancio 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611859318



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -194,14 +196,45 @@ private void completeCurrentBatch() {
 MemoryRecords data = currentBatch.build();
 completed.add(new CompletedBatch<>(
 currentBatch.baseOffset(),
-currentBatch.records(),
+Optional.of(currentBatch.records()),
 data,
 memoryPool,
 currentBatch.initialBuffer()
 ));
 currentBatch = null;
 }
 
+public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+maybeCompleteDrain();
+ByteBuffer buffer = memoryPool.tryAllocate(256);
+if (buffer != null) {
+MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
+this.nextOffset, 
+currentTimeMs, 
+this.epoch, 
+buffer, 
+leaderChangeMessage);
+completed.add(new CompletedBatch<>(
+nextOffset,
+Optional.empty(),
+data,
+memoryPool,
+buffer
+));
+nextOffset += 1;
+}
+}
+
+public void flush() {

Review comment:
   https://github.com/apache/kafka/pull/10480#discussion_r610237424




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10506: MINOR: Improve description of `max.poll.records` config

2021-04-12 Thread GitBox


hachikuji commented on a change in pull request #10506:
URL: https://github.com/apache/kafka/pull/10506#discussion_r611840016



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##
@@ -66,7 +66,8 @@
 
 /** max.poll.records */
 public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
-private static final String MAX_POLL_RECORDS_DOC = "The maximum number of 
records returned in a single call to poll().";
+private static final String MAX_POLL_RECORDS_DOC = "The maximum number of 
records returned in a single call to poll()."
++ " Note, that " + MAX_POLL_RECORDS_CONFIG + " does not 
impact the underlying fetching behavior.";

Review comment:
   Maybe add one more sentence:
   
   > .. underlying fetch behavior. The consumer will cache the records from 
each Fetch request and return them incrementally from each `poll`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic to introduce LogLoader

2021-04-12 Thread GitBox


kowshik commented on a change in pull request #10478:
URL: https://github.com/apache/kafka/pull/10478#discussion_r611830920



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -255,19 +261,21 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
 @threadsafe
 class Log(@volatile private var _dir: File,
   @volatile var config: LogConfig,
+  val segments: LogSegments,

Review comment:
   Sounds good. I'll give this a shot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10888) Sticky partition leads to uneven product msg, resulting in abnormal delays in some partitions

2021-04-12 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10888:
--
Summary:  Sticky partition leads to uneven product msg, resulting in 
abnormal delays in some partitions  (was:  Sticky partition leads to uneven 
product msg, resulting in abnormal delays in some partations)

>  Sticky partition leads to uneven product msg, resulting in abnormal delays 
> in some partitions
> --
>
> Key: KAFKA-10888
> URL: https://issues.apache.org/jira/browse/KAFKA-10888
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.4.1
>Reporter: jr
>Priority: Major
> Attachments: image-2020-12-24-21-05-02-800.png, 
> image-2020-12-24-21-09-47-692.png, image-2020-12-24-21-10-24-407.png
>
>
>   110 producers ,550 partitions ,550 consumers , 5 nodes Kafka cluster
>   The producer uses the nullkey+stick partitioner, the total production rate 
> is about 100w tps
> Observed partition delay is abnormal and message distribution is uneven, 
> which leads to the maximum production and consumption delay of the partition 
> with more messages 
> abnormal.
>   I cannot find reason that stick will make the message distribution uneven 
> at this production rate.
>   I can't switch to the round-robin partitioner, which will increase the 
> delay and cpu cost. Is thathe stick partationer design cause uneven message 
> distribution, or this is abnormal. How to solve it?
>   !image-2020-12-24-21-09-47-692.png!
> As shown in the picture, the uneven distribution is concentrated on some 
> partitions and some brokers, there seems to be some rules.
> This problem does not only occur in one cluster, but in many high tps 
> clusters,
> The problem is more obvious on the test cluster we built.
> !image-2020-12-24-21-10-24-407.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax opened a new pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest

2021-04-12 Thread GitBox


mjsax opened a new pull request #10529:
URL: https://github.com/apache/kafka/pull/10529


   The NPE arrises, if we cannot setup topics quickly enough, and run into a 
timeout -- for this case, we try to call `admit.delete` (note that `admin` is 
mocked) that returns `null`.
   
   However, we should never actually "abort" the internal topic creation. -- 
Using `MockTime` instead of `SystemTime` should avoid this issue, as time won't 
advance at all. Or would there be any concern with regard to running forever? 
An alternative approach could be, to increase `max.poll.interval.ms` that 
defines the timeout.
   
   Call for review @bbejeck @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611787145



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -194,14 +196,45 @@ private void completeCurrentBatch() {
 MemoryRecords data = currentBatch.build();
 completed.add(new CompletedBatch<>(
 currentBatch.baseOffset(),
-currentBatch.records(),
+Optional.of(currentBatch.records()),
 data,
 memoryPool,
 currentBatch.initialBuffer()
 ));
 currentBatch = null;
 }
 
+public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+maybeCompleteDrain();
+ByteBuffer buffer = memoryPool.tryAllocate(256);
+if (buffer != null) {
+MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
+this.nextOffset, 
+currentTimeMs, 
+this.epoch, 
+buffer, 
+leaderChangeMessage);
+completed.add(new CompletedBatch<>(
+nextOffset,
+Optional.empty(),
+data,
+memoryPool,
+buffer
+));
+nextOffset += 1;
+}
+}
+
+public void flush() {

Review comment:
   I don't think we should expose this functionality. I think users of 
these type will always call `flush` after calling `appendLeaderChangeMessage`. 
If so why not do that implicitly in the method.

##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -19,10 +19,14 @@
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.mockito.Mockito;

Review comment:
   This order of import doesn't match any of the styles used in Kafka.

##
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##
@@ -36,30 +36,31 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LeaderStateTest {
+public class LeaderStateTest {

Review comment:
   Why is the `T` leaked to the tests? Glancing at the code the type 
parameter `T` is never used. Did you try changing the signature to:
   
   ```
   private final BatchAccumulator accumulator = 
Mockito.mock(BatchAccumulator.class);
   ```

##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -194,14 +196,45 @@ private void completeCurrentBatch() {
 MemoryRecords data = currentBatch.build();
 completed.add(new CompletedBatch<>(
 currentBatch.baseOffset(),
-currentBatch.records(),
+Optional.of(currentBatch.records()),
 data,
 memoryPool,
 currentBatch.initialBuffer()
 ));
 currentBatch = null;
 }
 
+public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {

Review comment:
   This method is not safe. I think you need to hold a lock before falling 
`maybeCompleteDrain` and updating `nextOffset`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611781359



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1876,12 +1836,12 @@ private void appendBatch(
 }
 
 private long maybeAppendBatches(
-LeaderState state,
+LeaderState state,
 long currentTimeMs
 ) {
-long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs);
+long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs);

Review comment:
   There is an IO sync happening with the LeaderChangeMessage. That is the 
only time accumulator.flush is called. I think maybe this variable name can get 
changed to timeUntilDrain.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic to introduce LogLoader

2021-04-12 Thread GitBox


satishd commented on a change in pull request #10478:
URL: https://github.com/apache/kafka/pull/10478#discussion_r611789255



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -255,19 +261,21 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
 @threadsafe
 class Log(@volatile private var _dir: File,
   @volatile var config: LogConfig,
+  val segments: LogSegments,

Review comment:
   What I meant was we can have `Log` takes the argument as immutable 
`LogComponents` and it can initialize the vars inside `Log` with the respective 
fields from `LogComponents`. This will also set the right access of these vars 
by not giving write access by default. 
   
   ```
   class Log(@volatile private var _dir: File,
 @volatile var config: LogConfig,
 val segments: LogSegments,
 val logComponents: LogComponents,
 scheduler: Scheduler,
 brokerTopicStats: BrokerTopicStats,
 val time: Time,
 val maxProducerIdExpirationMs: Int,
 val producerIdExpirationCheckIntervalMs: Int,
 val topicPartition: TopicPartition,
 logDirFailureChannel: LogDirFailureChannel,
 @volatile var topicId: Option[Uuid],
 val keepPartitionMetadataFile: Boolean = true) extends Logging 
with KafkaMetricsGroup {
   
   
   
 @volatile private var logStartOffset: Long = logComponents.logStartOffset
 @volatile private var recoveryPoint: Long = logComponents.recoveryPoint
 @volatile private var nextOffsetMetadata: LogOffsetMetadata = 
logComponents.nextOffsetMetadata
 @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = 
logComponents.leaderEpochCache
 private val producerStateManager: ProducerStateManager = 
logComponents.producerStateManager
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611781359



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1876,12 +1836,12 @@ private void appendBatch(
 }
 
 private long maybeAppendBatches(
-LeaderState state,
+LeaderState state,
 long currentTimeMs
 ) {
-long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs);
+long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs);

Review comment:
   There is an IO sync happening with the LeaderChangeMessage. That is the 
only time accumulator.flush is called. I think maybe this variable name can get 
changed to timeUntilDrain.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611781359



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1876,12 +1836,12 @@ private void appendBatch(
 }
 
 private long maybeAppendBatches(
-LeaderState state,
+LeaderState state,
 long currentTimeMs
 ) {
-long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs);
+long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs);

Review comment:
   There is an IO sync happening with the LeaderChangeMessage. That is the 
only time accumulator.flush is called. I think maybe this variable name here 
can get changed to timeUntilDrain?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611781359



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1876,12 +1836,12 @@ private void appendBatch(
 }
 
 private long maybeAppendBatches(
-LeaderState state,
+LeaderState state,
 long currentTimeMs
 ) {
-long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs);
+long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs);

Review comment:
   There is an IO sync happening with the LeaderChangeMessage. That is the 
only time accumulator.flush is called. I think maybe this variable name can get 
changed to timeUntilDrain?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-12 Thread GitBox


mjsax commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-817951507


   Thanks for the fix @MarcoLotz! -- Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

2021-04-12 Thread GitBox


mjsax merged pull request #10042:
URL: https://github.com/apache/kafka/pull/10042


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12650) NPE in InternalTopicManager#cleanUpCreatedTopics

2021-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319549#comment-17319549
 ] 

Matthias J. Sax commented on KAFKA-12650:
-

[https://github.com/apache/kafka/pull/10042/checks?check_run_id=2319410471] 

> NPE in InternalTopicManager#cleanUpCreatedTopics
> 
>
> Key: KAFKA-12650
> URL: https://issues.apache.org/jira/browse/KAFKA-12650
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.cleanUpCreatedTopics(InternalTopicManager.java:675)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.maybeThrowTimeoutExceptionDuringSetup(InternalTopicManager.java:755)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.processCreateTopicResults(InternalTopicManager.java:652)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.setup(InternalTopicManager.java:599)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup(InternalTopicManagerTest.java:180)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12650) NPE in InternalTopicManager#cleanUpCreatedTopics

2021-04-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-12650:
---

Assignee: Matthias J. Sax

> NPE in InternalTopicManager#cleanUpCreatedTopics
> 
>
> Key: KAFKA-12650
> URL: https://issues.apache.org/jira/browse/KAFKA-12650
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.0.0
>
>
> {code:java}
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.cleanUpCreatedTopics(InternalTopicManager.java:675)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.maybeThrowTimeoutExceptionDuringSetup(InternalTopicManager.java:755)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.processCreateTopicResults(InternalTopicManager.java:652)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.setup(InternalTopicManager.java:599)
>   at 
> org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup(InternalTopicManagerTest.java:180)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication

2021-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319546#comment-17319546
 ] 

Matthias J. Sax commented on KAFKA-9013:


[https://github.com/apache/kafka/pull/10042/checks?check_run_id=2319756935]
{quote}java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365)
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:173)
{quote}

> Flaky Test MirrorConnectorsIntegrationTest#testReplication
> --
>
> Key: KAFKA-9013
> URL: https://issues.apache.org/jira/browse/KAFKA-9013
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {code:java}
> java.lang.AssertionError: Condition not met within timeout 2. Offsets not 
> translated downstream to primary cluster.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354)
>   at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239)
> {code}
> h1. Standard Error
> {code}
> Standard Error
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration 

[jira] [Commented] (KAFKA-12629) Flaky Test RaftClusterTest

2021-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319548#comment-17319548
 ] 

Matthias J. Sax commented on KAFKA-12629:
-

Another timeout: 
[https://github.com/apache/kafka/pull/10042/checks?check_run_id=2319729123] 

> Flaky Test RaftClusterTest
> --
>
> Key: KAFKA-12629
> URL: https://issues.apache.org/jira/browse/KAFKA-12629
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote} {{java.util.concurrent.ExecutionException: 
> java.lang.ClassNotFoundException: 
> org.apache.kafka.controller.NoOpSnapshotWriterBuilder
>   at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611772289



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1876,12 +1836,12 @@ private void appendBatch(
 }
 
 private long maybeAppendBatches(
-LeaderState state,
+LeaderState state,
 long currentTimeMs
 ) {
-long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs);
+long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs);

Review comment:
   @hachikuji we had this discussion in the PR that introduced the 
`BatchAccumulator`. I was under the impression that we agreed to not use the 
word "flush" since in Java that word is usually used to represent some kind of 
IO sync.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-12 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610281681



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(
+nextOffset,
+null,

Review comment:
   Using `Optional.ofNullable` in the constructor saved the time of having 
to change every other instantiation of CompletedBatch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory

2021-04-12 Thread amuthan Ganeshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319521#comment-17319521
 ] 

amuthan Ganeshan commented on KAFKA-12559:
--

Thanks, [~ableegoldman], for the explanation; yep, it makes sense now. Let me 
give it a try and get back to you if I have further questions.

> Add a top-level Streams config for bounding off-heap memory
> ---
>
> Key: KAFKA-12559
> URL: https://issues.apache.org/jira/browse/KAFKA-12559
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: amuthan Ganeshan
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> At the moment we provide an example of how to bound the memory usage of 
> rocskdb in the [Memory 
> Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb]
>  section of the docs. This requires implementing a custom RocksDBConfigSetter 
> class and setting a number of rocksdb options for relatively advanced 
> concepts and configurations. It seems a fair number of users either fail to 
> find this or consider it to be for more advanced use cases/users. But RocksDB 
> can eat up a lot of off-heap memory and it's not uncommon for users to come 
> across a {{RocksDBException: Cannot allocate memory}}
> It would probably be a much better user experience if we implemented this 
> memory bound out-of-the-box and just gave users a top-level StreamsConfig to 
> tune the off-heap memory given to rocksdb, like we have for on-heap cache 
> memory with cache.max.bytes.buffering. More advanced users can continue to 
> fine-tune their memory bounding and apply other configs with a custom config 
> setter, while new or more casual users can cap on the off-heap memory without 
> getting their hands dirty with rocksdb.
> I would propose to add the following top-level config:
> rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid 
> values are [0, inf]
> I'd also want to consider adding a second, lower priority top-level config to 
> give users a knob for adjusting how much of that total off-heap memory goes 
> to the block cache + index/filter blocks, and how much of it is afforded to 
> the write buffers. I'm struggling to come up with a good name for this 
> config, but it would be something like
> rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default 
> to 0.5, valid values are [0, 1]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory

2021-04-12 Thread amuthan Ganeshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

amuthan Ganeshan reassigned KAFKA-12559:


Assignee: amuthan Ganeshan

> Add a top-level Streams config for bounding off-heap memory
> ---
>
> Key: KAFKA-12559
> URL: https://issues.apache.org/jira/browse/KAFKA-12559
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: amuthan Ganeshan
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> At the moment we provide an example of how to bound the memory usage of 
> rocskdb in the [Memory 
> Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb]
>  section of the docs. This requires implementing a custom RocksDBConfigSetter 
> class and setting a number of rocksdb options for relatively advanced 
> concepts and configurations. It seems a fair number of users either fail to 
> find this or consider it to be for more advanced use cases/users. But RocksDB 
> can eat up a lot of off-heap memory and it's not uncommon for users to come 
> across a {{RocksDBException: Cannot allocate memory}}
> It would probably be a much better user experience if we implemented this 
> memory bound out-of-the-box and just gave users a top-level StreamsConfig to 
> tune the off-heap memory given to rocksdb, like we have for on-heap cache 
> memory with cache.max.bytes.buffering. More advanced users can continue to 
> fine-tune their memory bounding and apply other configs with a custom config 
> setter, while new or more casual users can cap on the off-heap memory without 
> getting their hands dirty with rocksdb.
> I would propose to add the following top-level config:
> rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid 
> values are [0, inf]
> I'd also want to consider adding a second, lower priority top-level config to 
> give users a knob for adjusting how much of that total off-heap memory goes 
> to the block cache + index/filter blocks, and how much of it is afforded to 
> the write buffers. I'm struggling to come up with a good name for this 
> config, but it would be something like
> rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default 
> to 0.5, valid values are [0, 1]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jthurne commented on pull request #10491: MINOR: Switch to using the Gradle RAT plugin

2021-04-12 Thread GitBox


jthurne commented on pull request #10491:
URL: https://github.com/apache/kafka/pull/10491#issuecomment-817901760


   Interesting. When I run the rat task locally, those three files are 
identified (correctly) as binary files and RAT skips them.
   
   I'm not sure why they are being detected as text files on CI. But an easy 
workaround is to explicitly exclude that directory. I've pushed up a commit 
that does just that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-12 Thread GitBox


C0urante commented on pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-817894272


   @ncliang @gharris1727 @kpatelatwork @ddasarathan could one or two of you 
take a look at this when you have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-12 Thread GitBox


dajac commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r611717669



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements 
AdminApiHandler {
+private final LogContext logContext;
+private final Logger log;
+private final Set keys;
+
+public DescribeTransactionsHandler(
+Collection transactionalIds,
+LogContext logContext
+) {
+this.keys = buildKeySet(transactionalIds);
+this.log = logContext.logger(DescribeTransactionsHandler.class);
+this.logContext = logContext;
+}
+
+private static Set buildKeySet(Collection 
transactionalIds) {
+return transactionalIds.stream()
+.map(DescribeTransactionsHandler::asCoordinatorKey)
+.collect(Collectors.toSet());
+}
+
+@Override
+public String apiName() {
+return "describeTransactions";
+}
+
+@Override
+public Keys initializeKeys() {
+return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+}
+
+@Override
+public DescribeTransactionsRequest.Builder buildRequest(
+Integer brokerId,
+Set keys
+) {
+DescribeTransactionsRequestData request = new 
DescribeTransactionsRequestData();
+List transactionalIds = keys.stream().map(key -> 
key.idValue).collect(Collectors.toList());
+request.setTransactionalIds(transactionalIds);
+return new DescribeTransactionsRequest.Builder(request);
+}
+
+@Override
+public ApiResult handleResponse(
+Integer brokerId,
+Set keys,
+AbstractResponse abstractResponse
+) {
+DescribeTransactionsResponse response = (DescribeTransactionsResponse) 
abstractResponse;
+Map completed = new 
HashMap<>();
+Map failed = new HashMap<>();
+List unmapped = new ArrayList<>();
+
+for (DescribeTransactionsResponseData.TransactionState 
transactionState : response.data().transactionStates()) {
+CoordinatorKey transactionalIdKey = 
asCoordinatorKey(transactionState.transactionalId());
+Errors error = Errors.forCode(transactionState.errorCode());
+
+if (error != Errors.NONE) {
+handleError(transactionalIdKey, error, failed, unmapped);
+continue;
+}
+
+OptionalLong transactionStartTimeMs = 
transactionState.transactionStartTimeMs() < 0 ?
+OptionalLong.empty() :
+OptionalLong.of(transactionState.transactionStartTimeMs());
+
+completed.put(transactionalIdKey, new TransactionDescription(
+brokerId,
+TransactionState.parse(transactionState.transactionState()),
+transactionState.producerId(),
+transactionState.producerEpoch(),
+

[GitHub] [kafka] dajac commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-12 Thread GitBox


dajac commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r611715337



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class DescribeTransactionsResult {
+private final Map> 
futures;
+
+DescribeTransactionsResult(Map> futures) {
+this.futures = futures;
+}
+
+public KafkaFuture transactionalIdResult(String 
transactionalId) {

Review comment:
   `description` sounds good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-04-12 Thread GitBox


C0urante opened a new pull request #10528:
URL: https://github.com/apache/kafka/pull/10528


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12497)
   
   This change serves two purposes:
   1. Eliminate unnecessary log messages for offset commit of tasks that don't 
need to perform offset commits (e.g., a task that has failed and for which all 
data has been flushed and committed)
   2. Stop blocking the offset commit thread unnecessarily for flushes that 
will never succeed because the task's producer has failed to send a record in 
the current batch with a non-retriable error
   
   Existing unit tests for the `OffsetStorageWriter` are tweaked to verify the 
small change made to it. Several new unit tests are added for the 
`WorkerSourceTask` that cover various cases where offset commits should not be 
attempted, and some existing tests are modified to cover cases where offset 
commits either should or should not be attempted.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during Connect task shutdown

2021-04-12 Thread GitBox


kkonstantine merged pull request #10503:
URL: https://github.com/apache/kafka/pull/10503


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-7249) Provide an official Docker Hub image for Kafka

2021-04-12 Thread Timothy Higinbottom (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timothy Higinbottom resolved KAFKA-7249.

Resolution: Not A Problem

> Provide an official Docker Hub image for Kafka
> --
>
> Key: KAFKA-7249
> URL: https://issues.apache.org/jira/browse/KAFKA-7249
> Project: Kafka
>  Issue Type: New Feature
>  Components: build, documentation, packaging, tools, website
>Affects Versions: 1.0.1, 1.1.0, 1.1.1, 2.0.0
>Reporter: Timothy Higinbottom
>Priority: Major
>  Labels: build, distribution, docker, packaging
>
> It would be great if there was an official Docker Hub image for Kafka, 
> supported by the Kafka community, so we knew that the image was trusted and 
> stable for use in production. Many organizations and teams are now using 
> Docker, Kubernetes, and other container systems that make deployment easier. 
> I think Kafka should move into this space and encourage this as an easy way 
> for beginners to get started, but also as a portable and effective way to 
> deploy Kafka in production. 
>  
> Currently there are only Kafka images maintained by third parties, which 
> seems like a shame for a big Apache project like Kafka. Hope you all consider 
> this.
>  
> Thanks,
> Tim



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12408) Document omitted ReplicaManager metrics

2021-04-12 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley resolved KAFKA-12408.
-
Fix Version/s: 3.0.0
 Reviewer: Tom Bentley
   Resolution: Fixed

> Document omitted ReplicaManager metrics
> ---
>
> Key: KAFKA-12408
> URL: https://issues.apache.org/jira/browse/KAFKA-12408
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
> Fix For: 3.0.0
>
>
> There are several problems in ReplicaManager metrics documentation:
>  * kafka.server:type=ReplicaManager,name=OfflineReplicaCount is omitted.
>  * kafka.server:type=ReplicaManager,name=FailedIsrUpdatesPerSec is omitted.
>  * kafka.server:type=ReplicaManager,name=[PartitionCount|LeaderCount]'s 
> descriptions are omitted: 'mostly even across brokers'.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #10527: Merge trunk 04/12

2021-04-12 Thread GitBox


vvcephei commented on pull request #10527:
URL: https://github.com/apache/kafka/pull/10527#issuecomment-817865515


   Oy, just saw that, too. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >