[jira] [Updated] (KAFKA-10493) KTable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10493: Summary: KTable out-of-order updates are not being ignored (was: Ktable out-of-order updates are not being ignored) > KTable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Assignee: Matthias J. Sax >Priority: Critical > Fix For: 3.0.0 > > Attachments: KTableOutOfOrderBug.java > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9195) Use Randomized State Directory Names in Streams System Tests
[ https://issues.apache.org/jira/browse/KAFKA-9195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-9195: -- Assignee: (was: Matthias J. Sax) > Use Randomized State Directory Names in Streams System Tests > - > > Key: KAFKA-9195 > URL: https://issues.apache.org/jira/browse/KAFKA-9195 > Project: Kafka > Issue Type: Improvement > Components: streams, system tests >Reporter: Bruno Cadonna >Priority: Major > > Currently, the state directory property in streams' system tests is set to > the {{PERSISTENT_ROOT}} variable. Since Streams applications in different > tests have the same application ID and the state directory path consists of > state directory property + application ID + task ID, it might happen that a > dirty state directory of one test is re-used by another test if the state > directory is not properly cleaned up. This may lead to unexpected results and > false positive and/or flaky failures. > The state directory property shall be set to a randomized path inside > {{PERSISTENT_ROOT}} to ensure that tests may not interfere with each other in > the case of missing state clean-ups. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #10532: KAFKA-8531: Change default replication factor config
mjsax commented on pull request #10532: URL: https://github.com/apache/kafka/pull/10532#issuecomment-818450072 Was also wondering about a potential error message -- not sure atm what error message a user would get if they run against 2.3 brokers and if the error message would be clear. Should we do anything about it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10532: KAFKA-8531: Change default replication factor config
mjsax commented on pull request #10532: URL: https://github.com/apache/kafka/pull/10532#issuecomment-818449362 As test against older broker versions, I started https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4463/ -- maybe we need to update some system tests... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10531: KAFKA-12658: Include kafka-shell jar and dependencies in release tar
ijuma commented on pull request #10531: URL: https://github.com/apache/kafka/pull/10531#issuecomment-818447856 This change has no impact on tests and the build part of the PR builder had succeeded. I went ahead and merged to trunk and 2.8. cc @vvcephei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #10532: KAFKA-8531: Change default replication factor config
mjsax opened a new pull request #10532: URL: https://github.com/apache/kafka/pull/10532 Call for review @cadonna @ableegoldman -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10531: KAFKA-12658: Include kafka-shell jar and dependencies in release tar
ijuma merged pull request #10531: URL: https://github.com/apache/kafka/pull/10531 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #10460: KAFKA-10357: Use validate and setup during internal topics creation
rodesai commented on a change in pull request #10460: URL: https://github.com/apache/kafka/pull/10460#discussion_r612131444 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java ## @@ -34,83 +40,64 @@ public class ChangelogTopics { private final InternalTopicManager internalTopicManager; -private final Map topicGroups; -private final Map> tasksForTopicGroup; +private final ValidationResult validationResult; +private final Map changelogTopicConfigs; private final Map> changelogPartitionsForStatefulTask = new HashMap<>(); private final Map> preExistingChangelogPartitionsForTask = new HashMap<>(); private final Set preExistingNonSourceTopicBasedChangelogPartitions = new HashSet<>(); private final Set sourceTopicBasedChangelogTopics = new HashSet<>(); -private final Set preExsitingSourceTopicBasedChangelogPartitions = new HashSet<>(); +private final Set preExistingSourceTopicBasedChangelogPartitions = new HashSet<>(); private final Logger log; public ChangelogTopics(final InternalTopicManager internalTopicManager, final Map topicGroups, final Map> tasksForTopicGroup, final String logPrefix) { this.internalTopicManager = internalTopicManager; -this.topicGroups = topicGroups; -this.tasksForTopicGroup = tasksForTopicGroup; final LogContext logContext = new LogContext(logPrefix); log = logContext.logger(getClass()); +changelogTopicConfigs = computeChangelogTopicConfig(topicGroups, tasksForTopicGroup); +validationResult = internalTopicManager.validate(changelogTopicConfigs); } public void setup() { -// add tasks to state change log topic subscribers -final Map changelogTopicMetadata = new HashMap<>(); -for (final Map.Entry entry : topicGroups.entrySet()) { -final int topicGroupId = entry.getKey(); -final TopicsInfo topicsInfo = entry.getValue(); - -final Set topicGroupTasks = tasksForTopicGroup.get(topicGroupId); -if (topicGroupTasks == null) { -log.debug("No tasks found for topic group {}", topicGroupId); -continue; -} else if (topicsInfo.stateChangelogTopics.isEmpty()) { -continue; -} - -for (final TaskId task : topicGroupTasks) { -final Set changelogTopicPartitions = topicsInfo.stateChangelogTopics -.keySet() -.stream() -.map(topic -> new TopicPartition(topic, task.partition)) -.collect(Collectors.toSet()); -changelogPartitionsForStatefulTask.put(task, changelogTopicPartitions); -} - -for (final InternalTopicConfig topicConfig : topicsInfo.nonSourceChangelogTopics()) { -// the expected number of partitions is the max value of TaskId.partition + 1 -int numPartitions = UNKNOWN; -for (final TaskId task : topicGroupTasks) { -if (numPartitions < task.partition + 1) { -numPartitions = task.partition + 1; -} -} -topicConfig.setNumberOfPartitions(numPartitions); -changelogTopicMetadata.put(topicConfig.name(), topicConfig); -} - sourceTopicBasedChangelogTopics.addAll(topicsInfo.sourceTopicChangelogs()); +if (!validationResult.misconfigurationsForTopics().isEmpty()) { +throw new MisconfiguredInternalTopicException(Utils.join(misconfigured().values().stream() +.flatMap(Collection::stream).collect(Collectors.toList()), Utils.NL) +); } -final Set newlyCreatedChangelogTopics = internalTopicManager.makeReady(changelogTopicMetadata); -log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values()); +final Set missingTopics = validationResult.missingTopics(); Review comment: So I understand correctly - In a future change we'll actually not create the missing topics if not configured to do so? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id
showuon commented on pull request #10525: URL: https://github.com/apache/kafka/pull/10525#issuecomment-818440268 OK, let's see what other reviewer's thought. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest
showuon commented on pull request #10409: URL: https://github.com/apache/kafka/pull/10409#issuecomment-818439327 @ableegoldman , the failed tests are un-related: ``` Build / ARM / org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication() Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic() Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() ``` Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API
hachikuji commented on a change in pull request #10483: URL: https://github.com/apache/kafka/pull/10483#discussion_r612131031 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java ## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.TransactionDescription; +import org.apache.kafka.clients.admin.TransactionState; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.message.DescribeTransactionsRequestData; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DescribeTransactionsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.stream.Collectors; + +public class DescribeTransactionsHandler implements AdminApiHandler { +private final LogContext logContext; +private final Logger log; +private final Set keys; + +public DescribeTransactionsHandler( +Collection transactionalIds, +LogContext logContext +) { +this.keys = buildKeySet(transactionalIds); +this.log = logContext.logger(DescribeTransactionsHandler.class); +this.logContext = logContext; +} + +private static Set buildKeySet(Collection transactionalIds) { +return transactionalIds.stream() +.map(DescribeTransactionsHandler::asCoordinatorKey) +.collect(Collectors.toSet()); +} + +@Override +public String apiName() { +return "describeTransactions"; +} + +@Override +public Keys initializeKeys() { +return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext)); +} + +@Override +public DescribeTransactionsRequest.Builder buildRequest( +Integer brokerId, +Set keys +) { +DescribeTransactionsRequestData request = new DescribeTransactionsRequestData(); +List transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList()); +request.setTransactionalIds(transactionalIds); +return new DescribeTransactionsRequest.Builder(request); +} + +@Override +public ApiResult handleResponse( +Integer brokerId, +Set keys, +AbstractResponse abstractResponse +) { +DescribeTransactionsResponse response = (DescribeTransactionsResponse) abstractResponse; +Map completed = new HashMap<>(); +Map failed = new HashMap<>(); +List unmapped = new ArrayList<>(); + +for (DescribeTransactionsResponseData.TransactionState transactionState : response.data().transactionStates()) { +CoordinatorKey transactionalIdKey = asCoordinatorKey(transactionState.transactionalId()); +Errors error = Errors.forCode(transactionState.errorCode()); + +if (error != Errors.NONE) { +handleError(transactionalIdKey, error, failed, unmapped); +continue; +} + +OptionalLong transactionStartTimeMs = transactionState.transactionStartTimeMs() < 0 ? +OptionalLong.empty() : +OptionalLong.of(transactionState.transactionStartTimeMs()); + +completed.put(transactionalIdKey, new TransactionDescription( +brokerId, +TransactionState.parse(transactionState.transactionState()), +transactionState.producerId(), +transactionState.producerEpoch(), +
[jira] [Commented] (KAFKA-12608) Simple identity pipeline sometimes loses data
[ https://issues.apache.org/jira/browse/KAFKA-12608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319890#comment-17319890 ] Matthias J. Sax commented on KAFKA-12608: - Could you provide the used input data to allow us to reproduce the issue? > Simple identity pipeline sometimes loses data > - > > Key: KAFKA-12608 > URL: https://issues.apache.org/jira/browse/KAFKA-12608 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 > Environment: > https://github.com/jamii/streaming-consistency/blob/c1f504e73141405ee6cd0c7f217604d643babf81/pkgs.nix > [nix-shell:~/streaming-consistency/kafka-streams]$ java -version > openjdk version "1.8.0_265" > OpenJDK Runtime Environment (build 1.8.0_265-ga) > OpenJDK 64-Bit Server VM (build 25.265-bga, mixed mode) > [nix-shell:~/streaming-consistency/kafka-streams]$ nix-info > system: "x86_64-linux", multi-user?: yes, version: nix-env (Nix) 2.3.10, > channels(jamie): "", channels(root): "nixos-20.09.3554.f8929dce13e", nixpkgs: > /nix/var/nix/profiles/per-user/root/channels/nixos >Reporter: Jamie Brandon >Priority: Major > > I'm running a very simple streams program that reads records from one topic > into a table and then writes the stream back into another topic. In about 1 > in 5 runs, some of the output records are missing. They tend to form a single > contiguous range, as if a single batch was dropped somewhere. > https://github.com/jamii/streaming-consistency/blob/main/kafka-streams/src/main/java/Demo.java#L49-L52 > {code:bash} > $ wc -l tmp/*transactions > 999514 tmp/accepted_transactions > 100 tmp/transactions > 1999514 total > $ cat tmp/transactions | cut -d',' -f 1 | cut -d' ' -f 2 > in > $ cat tmp/accepted_transactions | cut -d',' -f 1 | cut -d':' -f 2 > out > $ diff in out | wc -l > 487 > $ diff in out | head > 25313,25798d25312 > < 25312 > < 25313 > < 25314 > < 25315 > < 25316 > < 25317 > < 25318 > < 25319 > < 25320 > > $ diff in out | tail > < 25788 > < 25789 > < 25790 > < 25791 > < 25792 > < 25793 > < 25794 > < 25795 > < 25796 > < 25797 > {code} > I've checked running the consumer multiple times to make sure that the > records are actually missing from the topic and it wasn't just a hiccup in > the consumer. > The repo linked above has instructions in the readme on how to reproduce the > exact versions used. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell
[ https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12658: Affects Version/s: (was: 2.8.0) (was: 3.0.0) > bin/kafka-metadata-shell.sh cannot find or load main class > org.apache.kafka.shell.MetadataShell > --- > > Key: KAFKA-12658 > URL: https://issues.apache.org/jira/browse/KAFKA-12658 > Project: Kafka > Issue Type: Bug > Components: core > Environment: Ubuntu, Java 11 >Reporter: Israel Ekpo >Assignee: Ismael Juma >Priority: Blocker > Fix For: 2.8.0 > > > With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 > and 2.12 tarballs are not finding the class for the meta data shell from the > classpath > [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/] > > kafka-run-class.sh is not able to load it. > > cd ../kafka_2.12-2.8.0$ > > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > cd ../kafka_2.13-2.8.0/ > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell
[ https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12658: Fix Version/s: 2.8.0 > bin/kafka-metadata-shell.sh cannot find or load main class > org.apache.kafka.shell.MetadataShell > --- > > Key: KAFKA-12658 > URL: https://issues.apache.org/jira/browse/KAFKA-12658 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.0.0, 2.8.0 > Environment: Ubuntu, Java 11 >Reporter: Israel Ekpo >Assignee: Ismael Juma >Priority: Blocker > Fix For: 2.8.0 > > > With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 > and 2.12 tarballs are not finding the class for the meta data shell from the > classpath > [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/] > > kafka-run-class.sh is not able to load it. > > cd ../kafka_2.12-2.8.0$ > > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > cd ../kafka_2.13-2.8.0/ > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma opened a new pull request #10531: KAFKA-12658: Include kafka-shell jar in release tar
ijuma opened a new pull request #10531: URL: https://github.com/apache/kafka/pull/10531 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r612094560 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -118,20 +142,47 @@ final ProcessorGraphNode otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamProcessorName, otherWindowStreamProcessorParams); builder.addGraphNode(otherGraphNode, otherWindowedStreamsNode); +Optional, LeftOrRightValue>>> outerJoinWindowStore = Optional.empty(); +if (leftOuter || rightOuter) { +final String outerJoinSuffix = "-shared-outer-join-store"; Review comment: Should we use `-shared-left-join-store` and `-shared-right-join-store` to left/outer join? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -211,6 +263,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, return builder; } +@SuppressWarnings("unchecked") +private static StoreBuilder, LeftOrRightValue>> outerJoinWindowStoreBuilder(final String storeName, + final JoinWindows windows, + final StreamJoinedInternal streamJoinedInternal) { +final StoreBuilder, LeftOrRightValue>> builder = new TimeOrderedWindowStoreBuilder, LeftOrRightValue>( +persistentTimeOrderedWindowStore( +storeName + "-store", +Duration.ofMillis(windows.size() + windows.gracePeriodMs()), +Duration.ofMillis(windows.size()) +), +new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()), +new LeftOrRightValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()), +Time.SYSTEM Review comment: Should we pass a `Time` reference here to allow us to mock time in tests if necesssary? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -60,20 +82,41 @@ } private class KStreamKStreamJoinProcessor extends AbstractProcessor { +private static final boolean DISABLE_OUTER_JOIN_SPURIOUS_RESULTS_FIX_DEFAULT = false; Review comment: Seem overkill to introduce this one? It's used just ones to maybe just return `false` where it's used directly? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -93,23 +136,118 @@ public void process(final K key, final V1 value) { } boolean needOuterJoin = outer; +boolean joinFound = false; final long inputRecordTimestamp = context().timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +maxObservedStreamTime.advance(inputRecordTimestamp); + try (final WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; +joinFound = true; final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; + +// Emit expired records before the joined record to keep time ordering +emitExpiredNonJoinedOuterRecordsExcept(key, otherRecordTimestamp); + context().forward( key, joiner.apply(key, value, otherRecord.value), -To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); +To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); +} + +// Emit all expired records before adding a new non-joined record to the store. Otherwise, +// the put() call will advance the stream time, which causes records out of the retention +// period to be deleted, thus not being emitted later. +if (!joinFound && inputRecordTimestamp == maxObservedStreamTime.get()) { +emitExpiredNonJoinedOuterRecords(); } if (needOuterJoin) { -context().forward(key, joiner.apply(key, value, null)); +// The maxStreamTime contains the max time observed in both sides of the join. +// Having access to the time observed in the other join side fixes the following +
[GitHub] [kafka] predatorray edited a comment on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id
predatorray edited a comment on pull request #10525: URL: https://github.com/apache/kafka/pull/10525#issuecomment-818424067 Hi, @showuon. Thanks for your review! As per your question, I just follow the suggestion from @viktorsomogyi in the previous pr #5858 . I think maybe a subclass of KafkaException will be better since it indicates something more like an implementation error. To be honest, I cannot find any existing classes that suit this error, so i create a new one. If you think it will be better to throw `IllegalArgumentException`, i will change the code so we will not have introduce another new KafkaException to the code base. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] predatorray commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id
predatorray commented on pull request #10525: URL: https://github.com/apache/kafka/pull/10525#issuecomment-818424067 Hi, @showuon. Thanks for your review! As per your question, I just follow the suggestion from @viktorsomogyi in the previous pr #5858 . I think maybe a subclass of KafkaException will be better since it indicates something more like an implementation error. To be honest, I cannot find any existing classes that suit this error, so i create a new one. If you think it will be better to throw `IllegalArgumentException` is better, i will change the code so we will not have introduce another new KafkaException to the code base. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10491: MINOR: Switch to using the Gradle RAT plugin
ijuma merged pull request #10491: URL: https://github.com/apache/kafka/pull/10491 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest
ableegoldman commented on pull request #10529: URL: https://github.com/apache/kafka/pull/10529#issuecomment-818405468 > admin.deleteTopic would never return null in production, and thus handling null within InternalTopicManager for this case seems not to make sense. Agreed, making it a strict mock seems sufficient for avoiding the NPE then. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12661) ConfigEntry#equal does not compare other fields when value is NOT null
[ https://issues.apache.org/jira/browse/KAFKA-12661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-12661: --- Priority: Minor (was: Major) > ConfigEntry#equal does not compare other fields when value is NOT null > --- > > Key: KAFKA-12661 > URL: https://issues.apache.org/jira/browse/KAFKA-12661 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > {code:java} > return this.name.equals(that.name) && > this.value != null ? this.value.equals(that.value) : > that.value == null && > this.isSensitive == that.isSensitive && > this.isReadOnly == that.isReadOnly && > this.source == that.source && > Objects.equals(this.synonyms, that.synonyms); > {code} > the second value of ternary operator is "that.value == null && > this.isSensitive == that.isSensitive && > this.isReadOnly == that.isReadOnly && > this.source == that.source && > Objects.equals(this.synonyms, that.synonyms);" rather than > "that.value == null". Hence, it does not compare other fields when value is > not null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dongjinleekr commented on pull request #10526: KAFKA-12655: CVE-2021-28165 - Upgrade jetty to 9.4.39
dongjinleekr commented on pull request #10526: URL: https://github.com/apache/kafka/pull/10526#issuecomment-818401991 @omkreddy @edwin092 @showuon Thanks!! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12661) ConfigEntry#equal does not compare other fields when value is NOT null
[ https://issues.apache.org/jira/browse/KAFKA-12661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-12661: -- Assignee: Chia-Ping Tsai > ConfigEntry#equal does not compare other fields when value is NOT null > --- > > Key: KAFKA-12661 > URL: https://issues.apache.org/jira/browse/KAFKA-12661 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {code:java} > return this.name.equals(that.name) && > this.value != null ? this.value.equals(that.value) : > that.value == null && > this.isSensitive == that.isSensitive && > this.isReadOnly == that.isReadOnly && > this.source == that.source && > Objects.equals(this.synonyms, that.synonyms); > {code} > the second value of ternary operator is "that.value == null && > this.isSensitive == that.isSensitive && > this.isReadOnly == that.isReadOnly && > this.source == that.source && > Objects.equals(this.synonyms, that.synonyms);" rather than > "that.value == null". Hence, it does not compare other fields when value is > not null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12661) ConfigEntry#equal does not compare other fields when value is NOT null
Chia-Ping Tsai created KAFKA-12661: -- Summary: ConfigEntry#equal does not compare other fields when value is NOT null Key: KAFKA-12661 URL: https://issues.apache.org/jira/browse/KAFKA-12661 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai {code:java} return this.name.equals(that.name) && this.value != null ? this.value.equals(that.value) : that.value == null && this.isSensitive == that.isSensitive && this.isReadOnly == that.isReadOnly && this.source == that.source && Objects.equals(this.synonyms, that.synonyms); {code} the second value of ternary operator is "that.value == null && this.isSensitive == that.isSensitive && this.isReadOnly == that.isReadOnly && this.source == that.source && Objects.equals(this.synonyms, that.synonyms);" rather than "that.value == null". Hence, it does not compare other fields when value is not null. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #10446: MINOR: [ConfigEntry.class] add 'type' to 'toString' and 'hashCode'
chia7712 commented on a change in pull request #10446: URL: https://github.com/apache/kafka/pull/10446#discussion_r612095615 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java ## @@ -149,24 +149,28 @@ public boolean equals(Object o) { ConfigEntry that = (ConfigEntry) o; -return this.name.equals(that.name) && -this.value != null ? this.value.equals(that.value) : that.value == null && Review comment: This was a bug as it evaluated following conditions: ```java that.value == null && this.isSensitive == that.isSensitive && this.isReadOnly == that.isReadOnly && this.source == that.source && Objects.equals(this.synonyms, that.synonyms); ``` and the expected behavior is `that.value == null`. There are some tests depending on the "incorrect" equals. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest
mjsax commented on pull request #10529: URL: https://github.com/apache/kafka/pull/10529#issuecomment-818392640 Updated the PR. > (although, why would the max poll interval impact a timeout related to the admin client? Or do I not understand where this timeout is ultimately coming from) We use `InternalTopicManager` in the consumer group leader `assign()` callback to create internal topics. If we get errors back from admin client, we retry. To bound the retries, we use `max.poll.interval.ms / 2` because `max.poll.interval.ms` defines an upper bound before `assign()` must finish (otherwise the leader gets kicked out of the group). (Cf. KIP-572). > But we should probably try to actually handle the null even so, and fail with a more useful error message in case we ever do hit this again. `admin.deleteTopic` would never return `null` in production, and thus handling `null` within `InternalTopicManager` for this case seems not to make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest
mjsax commented on a change in pull request #10529: URL: https://github.com/apache/kafka/pull/10529#discussion_r612089734 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ## @@ -153,7 +156,8 @@ public void shouldNotCreateTopicsWithEmptyInput() throws Exception { @Test public void shouldOnlyRetryNotSuccessfulFuturesDuringSetup() { -final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); +final AdminClient admin = EasyMock.createStrictMock(AdminClient.class); +config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10_000L); Review comment: In test setup, we reduce max.poll to 100ms, to increasing it to 10 sec for this test should be sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest
showuon commented on pull request #10409: URL: https://github.com/apache/kafka/pull/10409#issuecomment-818386632 @ableegoldman , thank you. I'll monitor the PR build, and update here. You can go take a rest, and check it tomorrow (your time). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest
showuon commented on pull request #10409: URL: https://github.com/apache/kafka/pull/10409#issuecomment-818378362 @ableegoldman , I updated the PR to wait until all streams reach "RUNNING" state before consuming records. Let's wait and see the test results. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell
[ https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-12658: --- Assignee: Ismael Juma > bin/kafka-metadata-shell.sh cannot find or load main class > org.apache.kafka.shell.MetadataShell > --- > > Key: KAFKA-12658 > URL: https://issues.apache.org/jira/browse/KAFKA-12658 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.0.0, 2.8.0 > Environment: Ubuntu, Java 11 >Reporter: Israel Ekpo >Assignee: Ismael Juma >Priority: Blocker > > With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 > and 2.12 tarballs are not finding the class for the meta data shell from the > classpath > [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/] > > kafka-run-class.sh is not able to load it. > > cd ../kafka_2.12-2.8.0$ > > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > cd ../kafka_2.13-2.8.0/ > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12660) Do not update offset commit sensor after append failure
Jason Gustafson created KAFKA-12660: --- Summary: Do not update offset commit sensor after append failure Key: KAFKA-12660 URL: https://issues.apache.org/jira/browse/KAFKA-12660 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson In the append callback after writing an offset to the log in `GroupMetadataManager`, It seems wrong to update the offset commit sensor prior to checking for errors: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L394. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming commented on pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API
dengziming commented on pull request #10275: URL: https://github.com/apache/kafka/pull/10275#issuecomment-818372912 > Merging to trunk. @chia7712 Yeah, we should create some jiras. I was going to wait until the other use cases in KIP-664 had been fleshed out, but I guess there's no harm converting other APIs. The ListOffsets API would be a good place to start. I'll create a JIRA tomorrow if you don't get there first. I created a JIRA and a PR #10467 since you haven't done this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest
ableegoldman commented on pull request #10529: URL: https://github.com/apache/kafka/pull/10529#issuecomment-818368666 Personally I would slightly prefer to just increase the max.poll.interval if that's what defines the timeout (although, why would the max poll interval impact a timeout related to the admin client? Or do I not understand where this timeout is ultimately coming from) But we should probably try to actually handle the `null` even so, and fail with a more useful error message in case we ever do hit this again. Or if we can't handle the null directly, maybe use a not-nice mock so it'll fail rather than just returning null and hitting an NPE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12637) Remove deprecated PartitionAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-12637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12637. Resolution: Fixed > Remove deprecated PartitionAssignor interface > - > > Key: KAFKA-12637 > URL: https://issues.apache.org/jira/browse/KAFKA-12637 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: dengziming >Priority: Blocker > Labels: newbie, newbie++ > Fix For: 3.0.0 > > > In KIP-429, we deprecated the existing PartitionAssignor interface in order > to move it out of the internals package and better align the name with other > pluggable Consumer interfaces. We added an adapter to convert from existing > o.a.k.clients.consumer.internals.PartitionAssignor to the new > o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated > interface. This was deprecated in 2.4, so we should be ok to remove it and > the PartitionAssignorAdaptor in 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface
ableegoldman merged pull request #10512: URL: https://github.com/apache/kafka/pull/10512 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface
ableegoldman commented on pull request #10512: URL: https://github.com/apache/kafka/pull/10512#issuecomment-818367236 Merged to trunk -- thanks @dengziming ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface
ableegoldman commented on pull request #10512: URL: https://github.com/apache/kafka/pull/10512#issuecomment-818366846 Well there were a large number of failures, but all of them are unrelated. Left a comment/reopened the ticket for all of the following: kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication() kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync() kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() .kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync() kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutoOffsetSync() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
[ https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-8391: --- [~rhauch] [~kkonstantine] this test failed again -- based on the error message it looks like it may be a "real" failure this time, not environmental. Stacktrace java.lang.AssertionError: Tasks are imbalanced: localhost:35163=[seq-source11-0, seq-source11-3, seq-source10-1, seq-source12-1] localhost:36961=[seq-source11-1, seq-source10-2, seq-source12-2] localhost:39023=[seq-source11-2, seq-source10-0, seq-source10-3, seq-source12-0, seq-source12-3] at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:365) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:319) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:367) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:316) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:213) https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10512/5/testReport/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/Build___JDK_15_and_Scala_2_13___testDeleteConnector/ > Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector > --- > > Key: KAFKA-8391 > URL: https://issues.apache.org/jira/browse/KAFKA-8391 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Randall Hauch >Priority: Critical > Labels: flaky-test > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > Attachments: 100-gradle-builds.tar > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not stop in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12659) Mirrormaker 2 - seeking to wrong offsets on restart
stuart created KAFKA-12659: -- Summary: Mirrormaker 2 - seeking to wrong offsets on restart Key: KAFKA-12659 URL: https://issues.apache.org/jira/browse/KAFKA-12659 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.7.0 Environment: Docker container based on openjdk11:alpine-slim , running on Amazon ECS Reporter: stuart Attachments: partitions.png We are running a dedicated mirror maker 2 cluster with three tasks, and have been trialing it for a few weeks on a single topic. It's been going fine, so we attempted to add a second topic, changing the MM2 config file from topics = sports to topics = sports|translations We noticed the following day that the replication of the new topic was not working, and reading online it seems others have had similar issues, perhaps related to the config stored in the internal mm2-configs topic not refreshing from the file, so following recommendations in that thread we stopped the tasks for 10 minutes, and eventually it started replicating. However we also noticed later that MM2 had started re-replicating about 5 million records from earlier that day (from the original topic) which was concerning. A few hours later I restarted the MM2 tasks and the same thing happened, it started re-replicating the same old messages. Looking into the mm2-offsets-\{source}.internal topic I could see that the records which track offsets switched partitions, for example the records for sports-7 topic-partition went from being written to partition 5 (in mm2-offsets) to partition 8. The same occurred for other partitions (most but not all) Following the task restarts in the MM2 logs I can see that MM2 is always Seeking to offset 42741034 for sports-7, this value matches the oldest offset record on mm2-offsets partition 5, so it looks like MM2 is ignoring the more recent offset records on partition 8 and so not seeking to the correct latest offsets. And this also appears to affect compaction of the offsets internal topic, as while the older records on partition 8 for the sports-7 key are being cleaned up, the even older records for that same key on partition 5 are not. I cant be certain that introducing the second topic into MM2 config was the trigger for that partitioning behaviour change, I am not sure why it would unless adding more topics to the topic replication list caused MM2 to automatically scale the number of partitions on the mm2-offsets-\{source}.internal topic, which I guess might affect partitioning behaviour. It was the only noteworthy thing that we consciously changed within the same rough timeframe however. Attached is a screenshot to try and help illustrate the issue -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319797#comment-17319797 ] A. Sophie Blee-Goldman commented on KAFKA-9013: --- Failed again: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10512/5/testReport/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplication__/ > Flaky Test MirrorConnectorsIntegrationTest#testReplication > -- > > Key: KAFKA-9013 > URL: https://issues.apache.org/jira/browse/KAFKA-9013 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > h1. Stacktrace: > {code:java} > java.lang.AssertionError: Condition not met within timeout 2. Offsets not > translated downstream to primary cluster. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239) > {code} > h1. Standard Error > {code} > Standard Error > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.RootResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. > Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors > WARNING: The following warnings have been detected: WARNING: The > (sub)resource method listLoggers in > org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectors in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method createConnector in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectorPlugins in > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > contains empty path annotation. > WARNING: The (sub)resource method serverInfo in > org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty > path annotation. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider >
[jira] [Reopened] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
[ https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-12284: Assignee: (was: Luke Chen) Failed again, on both the SSL and plain version of this test: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10512/5/testReport/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationSSLTest/Build___JDK_15_and_Scala_2_13___testOneWayReplicationWithAutoOffsetSync__/ https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10512/5/testReport/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTest/Build___JDK_15_and_Scala_2_13___testOneWayReplicationWithAutoOffsetSync__/ java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: The request timed out. at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365) at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609) > Flaky Test > MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync > - > > Key: KAFKA-12284 > URL: https://issues.apache.org/jira/browse/KAFKA-12284 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470] > {quote} {{java.lang.RuntimeException: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}} > [...] > > {{Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists. > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364) > ... 92 more > Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic > 'primary.test-topic-2' already exists.}} > {quote} > STDOUT > {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to > heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:354) > org.apache.kafka.common.KafkaException: Producer is closed forcefully. > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) > at java.lang.Thread.run(Thread.java:748)}}{quote} > {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state > info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at >
[jira] [Commented] (KAFKA-12629) Flaky Test RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319794#comment-17319794 ] A. Sophie Blee-Goldman commented on KAFKA-12629: I'm seeing at least one failure in RaftClusterTest on almost every PR build lately. Looks like it's typically the same TimeoutException mentioned above. Seen on both testCreateClusterAndCreateAndManyTopics() and testCreateClusterAndCreateAndManyTopicsWithManyPartitions() > Flaky Test RaftClusterTest > -- > > Key: KAFKA-12629 > URL: https://issues.apache.org/jira/browse/KAFKA-12629 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > {quote} {{java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > org.apache.kafka.controller.NoOpSnapshotWriterBuilder > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell
[ https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-12658: - Priority: Blocker (was: Major) > bin/kafka-metadata-shell.sh cannot find or load main class > org.apache.kafka.shell.MetadataShell > --- > > Key: KAFKA-12658 > URL: https://issues.apache.org/jira/browse/KAFKA-12658 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.0.0, 2.8.0 > Environment: Ubuntu, Java 11 >Reporter: Israel Ekpo >Priority: Blocker > > With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 > and 2.12 tarballs are not finding the class for the meta data shell from the > classpath > [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/] > > kafka-run-class.sh is not able to load it. > > cd ../kafka_2.12-2.8.0$ > > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > cd ../kafka_2.13-2.8.0/ > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell
[ https://issues.apache.org/jira/browse/KAFKA-12658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-12658: Assignee: (was: John Roesler) > bin/kafka-metadata-shell.sh cannot find or load main class > org.apache.kafka.shell.MetadataShell > --- > > Key: KAFKA-12658 > URL: https://issues.apache.org/jira/browse/KAFKA-12658 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.0.0, 2.8.0 > Environment: Ubuntu, Java 11 >Reporter: Israel Ekpo >Priority: Major > > With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 > and 2.12 tarballs are not finding the class for the meta data shell from the > classpath > [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/] > > kafka-run-class.sh is not able to load it. > > cd ../kafka_2.12-2.8.0$ > > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > cd ../kafka_2.13-2.8.0/ > bin/kafka-metadata-shell.sh --help > Error: Could not find or load main class org.apache.kafka.shell.MetadataShell > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.shell.MetadataShell > !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12658) bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell
Israel Ekpo created KAFKA-12658: --- Summary: bin/kafka-metadata-shell.sh cannot find or load main class org.apache.kafka.shell.MetadataShell Key: KAFKA-12658 URL: https://issues.apache.org/jira/browse/KAFKA-12658 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.0.0, 2.8.0 Environment: Ubuntu, Java 11 Reporter: Israel Ekpo Assignee: John Roesler With the latest release candidate for 2.8.0, the binaries from the Scala 2.13 and 2.12 tarballs are not finding the class for the meta data shell from the classpath [https://home.apache.org/~vvcephei/kafka-2.8.0-rc1/] kafka-run-class.sh is not able to load it. cd ../kafka_2.12-2.8.0$ bin/kafka-metadata-shell.sh --help Error: Could not find or load main class org.apache.kafka.shell.MetadataShell Caused by: java.lang.ClassNotFoundException: org.apache.kafka.shell.MetadataShell cd ../kafka_2.13-2.8.0/ bin/kafka-metadata-shell.sh --help Error: Could not find or load main class org.apache.kafka.shell.MetadataShell Caused by: java.lang.ClassNotFoundException: org.apache.kafka.shell.MetadataShell !https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman merged pull request #10411: KAFKA-7606: Remove deprecated options from StreamsResetter
ableegoldman merged pull request #10411: URL: https://github.com/apache/kafka/pull/10411 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10505: MINOR: fix some bugs in ControllerApis.scala
mumrah commented on a change in pull request #10505: URL: https://github.com/apache/kafka/pull/10505#discussion_r612039100 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -343,21 +366,22 @@ class ControllerApis(val requestChannel: RequestChannel, iterator.remove() } } -val response = controller.createTopics(effectiveRequest).get() -duplicateTopicNames.forEach { name => - response.topics().add(new CreatableTopicResult(). -setName(name). -setErrorCode(INVALID_REQUEST.code()). -setErrorMessage("Found multiple entries for this topic.")) -} -topicNames.forEach { name => - if (!authorizedTopicNames.contains(name)) { +controller.createTopics(effectiveRequest).thenApply(response => { + duplicateTopicNames.forEach { name => response.topics().add(new CreatableTopicResult(). setName(name). - setErrorCode(TOPIC_AUTHORIZATION_FAILED.code())) + setErrorCode(INVALID_REQUEST.code()). + setErrorMessage("Found multiple entries for this topic.")) } -} -response + topicNames.forEach { name => +if (!authorizedTopicNames.contains(name)) { + response.topics().add(new CreatableTopicResult(). +setName(name). +setErrorCode(TOPIC_AUTHORIZATION_FAILED.code())) Review comment: nit: here and other places, we don't need parens for `code()` ## File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala ## @@ -151,6 +221,73 @@ class ControllerApisTest { brokerRegistrationResponse.errorCounts().asScala) } + @Test + def testUnauthorizedHandleAlterClientQuotas(): Unit = { +assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis( + Some(createDenyAllAuthorizer()), new MockController.Builder().build()). +handleAlterClientQuotas(buildRequest(new AlterClientQuotasRequest( + new AlterClientQuotasRequestData(), 0 + } + + @Test + def testUnauthorizedHandleIncrementalAlterConfigs(): Unit = { +val requestData = new IncrementalAlterConfigsRequestData().setResources( + new AlterConfigsResourceCollection( +util.Arrays.asList(new IncrementalAlterConfigsRequestData.AlterConfigsResource(). + setResourceName("1"). + setResourceType(ConfigResource.Type.BROKER.id()). + setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig(). +setName("log.cleaner.backoff.ms"). Review comment: Should we use the static KafkaConfig property definitions instead of these strings? ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -35,8 +36,8 @@ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker -import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsRequestData, CreateTopicsResponseData, DeleteTopicsRequestData, DeleteTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData} -import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, TOPIC_AUTHORIZATION_FAILED} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors._ Review comment: In KafkaApis we import `Errors` rather than importing all the members of the enum via a wildcard import. Any reason to prefer one way over the other? It seems more common in our code base to import the enum and refer to members like `Errors.ILLEGAL_SASL_STATE` ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -238,87 +249,99 @@ class ControllerApis(val requestChannel: RequestChannel, val toAuthenticate = new util.HashSet[String] toAuthenticate.addAll(providedNames) val idToName = new util.HashMap[Uuid, String] -controller.findTopicNames(providedIds).get().forEach { (id, nameOrError) => - if (nameOrError.isError) { -appendResponse(null, id, nameOrError.error()) - } else { -toAuthenticate.add(nameOrError.result()) -idToName.put(id, nameOrError.result()) - } -} -// Get the list of deletable topics (those we can delete) and the list of describeable -// topics. If a topic can't be deleted or described, we have to act like it doesn't -// exist, even when it does. -val topicsToAuthenticate = toAuthenticate.asScala -val (describeable, deletable) = if (hasClusterAuth)
[GitHub] [kafka] showuon commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest
showuon commented on pull request #10409: URL: https://github.com/apache/kafka/pull/10409#issuecomment-818309573 Sure, I'll update the PR later. I removed it because I tried before and it doesn't work well. Let's try it again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12643) Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in transform/process (this.context.schedule function)
[ https://issues.apache.org/jira/browse/KAFKA-12643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12643. --- Resolution: Duplicate Thanks for confirming! > Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in > transform/process (this.context.schedule function) > - > > Key: KAFKA-12643 > URL: https://issues.apache.org/jira/browse/KAFKA-12643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: David EVANO >Priority: Major > Attachments: Capture d’écran 2021-04-09 à 17.50.05.png > > > During a tranform() or a process() method: > Define a schedule tyask: > this.context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, > timestamp -> \{...} > store.put(...) or context.forward(...) produce a record with an invalid > timestamp. > For the forward, a workaround is define the timestamp: > context.forward(entry.key, entry.value.toString(), > To.all().withTimestamp(timestamp)); > But for state.put(...) or state.delete(...) functions there is no workaround. > Is it mandatory to have the Kafka broker version aligned with the Kafka > Streams version? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12642) Improve Rebalance reason upon metadata change
[ https://issues.apache.org/jira/browse/KAFKA-12642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319766#comment-17319766 ] Guozhang Wang commented on KAFKA-12642: --- Hi [~nicolas.guyomar] Maybe in the existing log message, we can print the `protocols` and `supportedProtocols` of the member, in the form of ? If you agree, please feel free to submit a PR. > Improve Rebalance reason upon metadata change > - > > Key: KAFKA-12642 > URL: https://issues.apache.org/jira/browse/KAFKA-12642 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Nicolas Guyomar >Priority: Minor > > Whenever the known member metadata does not match anymore the one from a > JoinGroupRequest, the GroupCoordinator triggers a rebalance with the > following reason "Updating metadata for member ${member.memberId}" but > there 2 underlying reasons from that part of the code in MemberMetadata.scala > : > {code:java} > def matches(protocols: List[(String, Array[Byte])]): Boolean = { > if (protocols.size != this.supportedProtocols.size) > return false > for (i <- protocols.indices) { > val p1 = protocols(i) > val p2 = supportedProtocols(i) > if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2)) > return false > } > true > }{code} > Could we improve the Rebalance Reason with a bit more detail maybe ? > > Thank you > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kpatelatwork opened a new pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…
kpatelatwork opened a new pull request #10530: URL: https://github.com/apache/kafka/pull/10530 …ertised uri is invalid and other nodes can't reach it. Node names have rules about what characters they can have and maximum length like in RFC-1123. The node-node communication over REST API won't happen if this node's advertised URL to the cluster has an invalid host name, and the error message in logs isn't very helpful. This PR adds a new behavior by using the java IDN class to expose the detailed error message and fails the server bootstrap. @C0urante , @rhauch and @kkonstantine please review ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-8605) Warn users when they have same connector in their plugin-path more than once
[ https://issues.apache.org/jira/browse/KAFKA-8605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kalpesh Patel reassigned KAFKA-8605: Assignee: Kalpesh Patel > Warn users when they have same connector in their plugin-path more than once > > > Key: KAFKA-8605 > URL: https://issues.apache.org/jira/browse/KAFKA-8605 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Cyrus Vafadari >Assignee: Kalpesh Patel >Priority: Major > > Right now it is very easy to have multiple copies of the same connector in > the plugin-path and not realize it. > This can be problematic if a user is adding dependencies into the plugin, or > accidentally using the wrong version of the connector. > An unintrusive improvement would be to log a warning if the same connector > appears in the plugin-path more than once -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #10506: MINOR: Improve description of `max.poll.records` config
mjsax commented on pull request #10506: URL: https://github.com/apache/kafka/pull/10506#issuecomment-818278763 Merged to `trunk` and cherry-picked to `2.8` and `2.7` branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12643) Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in transform/process (this.context.schedule function)
[ https://issues.apache.org/jira/browse/KAFKA-12643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319735#comment-17319735 ] David EVANO commented on KAFKA-12643: - Hello, Yes, this seems to be a duplicate of the mentioned issue . Thanks David Evano Le ven. 9 avr. 2021 à 18:21, Guozhang Wang (Jira) a > Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in > transform/process (this.context.schedule function) > - > > Key: KAFKA-12643 > URL: https://issues.apache.org/jira/browse/KAFKA-12643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: David EVANO >Priority: Major > Attachments: Capture d’écran 2021-04-09 à 17.50.05.png > > > During a tranform() or a process() method: > Define a schedule tyask: > this.context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, > timestamp -> \{...} > store.put(...) or context.forward(...) produce a record with an invalid > timestamp. > For the forward, a workaround is define the timestamp: > context.forward(entry.key, entry.value.toString(), > To.all().withTimestamp(timestamp)); > But for state.put(...) or state.delete(...) functions there is no workaround. > Is it mandatory to have the Kafka broker version aligned with the Kafka > Streams version? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
jsancio commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611957903 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; Review comment: Okay. This is an internal API so I say we add this when needed. If we still want to refactor then, this is similar to `maybeCompleteDrain` but it cannot assume that the lock is held. How about naming this method `completeDrain`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10506: MINOR: Improve description of `max.poll.records` config
mjsax merged pull request #10506: URL: https://github.com/apache/kafka/pull/10506 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12657) Flaky Tests BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop
Matthias J. Sax created KAFKA-12657: --- Summary: Flaky Tests BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop Key: KAFKA-12657 URL: https://issues.apache.org/jira/browse/KAFKA-12657 Project: Kafka Issue Type: Test Components: KafkaConnect Reporter: Matthias J. Sax [https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327377745] {quote} {{org.opentest4j.AssertionFailedError: Condition not met within timeout 6. Worker did not complete startup in time ==> expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:319) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:367) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:316) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at org.apache.kafka.connect.integration.BlockingConnectorTest.setup(BlockingConnectorTest.java:133)}} {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319715#comment-17319715 ] Matthias J. Sax edited comment on KAFKA-12566 at 4/12/21, 8:59 PM: --- [https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327349920] [https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327310946] {quote} {{java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: The request timed out. at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365) at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:173) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:63)}} {quote} was (Author: mjsax): [https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327349920] {quote} {{java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: The request timed out. at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365) at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:173) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:63)}}{quote} > Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication > - > > Key: KAFKA-12566 > URL: https://issues.apache.org/jira/browse/KAFKA-12566 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 2. > Offsets not translated downstream to primary cluster. ==> expected: > but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289) > {code} > {{LOGs}} > {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at >
[jira] [Commented] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319715#comment-17319715 ] Matthias J. Sax commented on KAFKA-12566: - [https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327349920] {quote} {{java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: The request timed out. at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365) at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:173) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:63)}}{quote} > Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication > - > > Key: KAFKA-12566 > URL: https://issues.apache.org/jira/browse/KAFKA-12566 > Project: Kafka > Issue Type: Test > Components: mirrormaker, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 2. > Offsets not translated downstream to primary cluster. ==> expected: > but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289) > {code} > {{LOGs}} > {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. > (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > read connector state. Error response: \{"error_code":404,"message":"No status > found for connector MirrorSourceConnector"} at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at > org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote} > and > {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] > Graceful stop of task MirrorHeartbeatConnector-0 failed. > (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR > [MirrorHeartbeatConnector|task-0] > WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to > heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) > org.apache.kafka.common.KafkaException: Producer is closed forcefully. at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750) > at >
[GitHub] [kafka] mjsax commented on pull request #10493: MINOR: cleanup Jenkins workspace before build
mjsax commented on pull request #10493: URL: https://github.com/apache/kafka/pull/10493#issuecomment-818144809 Thanks @ijuma! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax closed pull request #10493: MINOR: cleanup Jenkins workspace before build
mjsax closed pull request #10493: URL: https://github.com/apache/kafka/pull/10493 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r611915606 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), -To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); +To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + +outerJoinWindowStore.ifPresent(store -> { +// Delete the other joined key from the outer non-joined store now to prevent +// further processing +final KeyAndJoinSide otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); +if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { +store.put(otherJoinKey, null, otherRecordTimestamp); +} +}); } if (needOuterJoin) { -context().forward(key, joiner.apply(key, value, null)); +// The maxStreamTime contains the max time observed in both sides of the join. +// Having access to the time observed in the other join side fixes the following +// problem: +// +// Say we have a window size of 5 seconds +// 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) +// The record is not processed yet, and is added to the outer-join store +// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) +// The record is not processed yet, and is added to the outer-join store +// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) +// It is time to look at the expired records. T10 and T2 should be emitted, but +// because T2 was late, then it is not fetched by the window store, so it is not processed +// +// See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests +// +// the condition below allows us to process the late record without the need +// to hold it in the temporary outer store +if (timeTo < maxStreamTime) { +context().forward(key, joiner.apply(key, value, null)); +} else { +outerJoinWindowStore.ifPresent(store -> store.put( +KeyAndJoinSide.make(thisJoin, key), +makeValueOrOtherValue(thisJoin, value), +inputRecordTimestamp)); +} +} + +outerJoinWindowStore.ifPresent(store -> { +// only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime) +// if the current record is late, then there is no need to check for expired records +if (inputRecordTimestamp == maxStreamTime) { +maybeEmitOuterExpiryRecords(store, maxStreamTime); +} +}); +} +} + +private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) { Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r611915696 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), -To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); +To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + +outerJoinWindowStore.ifPresent(store -> { +// Delete the other joined key from the outer non-joined store now to prevent +// further processing +final KeyAndJoinSide otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); +if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { +store.put(otherJoinKey, null, otherRecordTimestamp); +} +}); } if (needOuterJoin) { -context().forward(key, joiner.apply(key, value, null)); +// The maxStreamTime contains the max time observed in both sides of the join. +// Having access to the time observed in the other join side fixes the following +// problem: +// +// Say we have a window size of 5 seconds +// 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10) +// The record is not processed yet, and is added to the outer-join store +// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) +// The record is not processed yet, and is added to the outer-join store +// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) +// It is time to look at the expired records. T10 and T2 should be emitted, but +// because T2 was late, then it is not fetched by the window store, so it is not processed +// +// See KStreamKStreamLeftJoinTest.testLowerWindowBound() tests +// +// the condition below allows us to process the late record without the need +// to hold it in the temporary outer store +if (timeTo < maxStreamTime) { +context().forward(key, joiner.apply(key, value, null)); +} else { +outerJoinWindowStore.ifPresent(store -> store.put( +KeyAndJoinSide.make(thisJoin, key), +makeValueOrOtherValue(thisJoin, value), +inputRecordTimestamp)); +} +} + +outerJoinWindowStore.ifPresent(store -> { +// only emit left/outer non-joined if the stream time has advanced (inputRecordTime = maxStreamTime) +// if the current record is late, then there is no need to check for expired records +if (inputRecordTimestamp == maxStreamTime) { +maybeEmitOuterExpiryRecords(store, maxStreamTime); +} +}); +} +} + +private ValueOrOtherValue makeValueOrOtherValue(final boolean thisJoin, final V1 value) { +return thisJoin +? ValueOrOtherValue.makeValue(value) +: ValueOrOtherValue.makeOtherValue(value); +} + +@SuppressWarnings("unchecked") +private void maybeEmitOuterExpiryRecords(final WindowStore, ValueOrOtherValue> store, final long maxStreamTime) { +try (final KeyValueIterator>, ValueOrOtherValue> it = store.all()) { +while (it.hasNext()) { +final KeyValue>, ValueOrOtherValue> e = it.next(); + +// Skip next records if the oldest record has not expired yet +if (e.key.window().end() + joinGraceMs >= maxStreamTime) { +break; +} + +final K key = e.key.key().getKey(); + +// Emit the record by joining with a null value. But the order varies depending whether +// this join is using a reverse joiner or not. Also
[jira] [Commented] (KAFKA-12654) separate checkAllSubscriptionEqual and getConsumerToOwnedPartitions methods
[ https://issues.apache.org/jira/browse/KAFKA-12654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319675#comment-17319675 ] A. Sophie Blee-Goldman commented on KAFKA-12654: To be honest, the sticky assignment algorithm for the general case is so slow that the additional time spent populating this map is probably negligible. That said, the general algorithm ends up populating exactly the same map in #prepopulateCurrentAssignments, so we could just pass it along to the generalAssign as well. But it also has to populate an additional map (prevAssignment) so it would still need to loop over and deserialize this info again anyways > separate checkAllSubscriptionEqual and getConsumerToOwnedPartitions methods > --- > > Key: KAFKA-12654 > URL: https://issues.apache.org/jira/browse/KAFKA-12654 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, when entering sticky assignor, we'll check if all consumers have > the same subscription to decide which assignor to use (constrained assignor > or general assignor). While checking the subscription, we also deserialize > the subscription user data to get the partitions owned by the consumer > (consumerToOwnedPartitions). However, the consumerToOwnedPartitions info is > not used in general assignor (we'll actually deserialize it inside general > assignor), so, we don't need to deserialize data for general assignor again. > We should separate these 2 things into 2 methods, and only deserialize the > user data for constrained assignor, to improve the general sticky assignor > performance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10409: KAFKA-9295: improve KTableKTableForeignKeyInnerJoinMultiIntegrationTest
ableegoldman commented on pull request #10409: URL: https://github.com/apache/kafka/pull/10409#issuecomment-818077023 It looks like the test failed in this most recent run -- however that's not unexpected since I think you may have accidentally taken out the other fix which probably _will_ help: to use `startStreamsAndWaitForRunning` so we make sure to wait for the KafkaStreams to start up and get into RUNNING. Let's add that back and see if it helps (I suspect it will) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10512: KAFKA-12637: Remove deprecated PartitionAssignor interface
ableegoldman commented on pull request #10512: URL: https://github.com/apache/kafka/pull/10512#issuecomment-818074480 Hm, the build was aborted -- looks like the gradle daemon was killed or crashed for some reason? I'll kick off a new run, let's hope this one passes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r611882455 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -102,14 +127,98 @@ public void process(final K key, final V1 value) { while (iter.hasNext()) { needOuterJoin = false; final KeyValue otherRecord = iter.next(); +final long otherRecordTimestamp = otherRecord.key; context().forward( key, joiner.apply(key, value, otherRecord.value), -To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecord.key))); +To.all().withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp))); + +outerJoinWindowStore.ifPresent(store -> { +// Delete the other joined key from the outer non-joined store now to prevent +// further processing +final KeyAndJoinSide otherJoinKey = KeyAndJoinSide.make(!thisJoin, key); +if (store.fetch(otherJoinKey, otherRecordTimestamp) != null) { Review comment: This needs to be supported in Window stores. I might need to write a KIP for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins
spena commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r611878043 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java ## @@ -211,6 +246,66 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier, return builder; } +@SuppressWarnings("unchecked") +private static StoreBuilder, ValueOrOtherValue>> outerJoinWindowStoreBuilder(final String storeName, + final JoinWindows windows, + final StreamJoinedInternal streamJoinedInternal) { +final StoreBuilder, ValueOrOtherValue>> builder = new TimeOrderedWindowStoreBuilder, ValueOrOtherValue>( +persistentTimeOrderedWindowStore( +storeName + "-store", +Duration.ofMillis(windows.size() + windows.gracePeriodMs()), +Duration.ofMillis(windows.size()) +), +new KeyAndJoinSideSerde<>(streamJoinedInternal.keySerde()), +new ValueOrOtherValueSerde(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde()), +Time.SYSTEM +); +if (streamJoinedInternal.loggingEnabled()) { +builder.withLoggingEnabled(streamJoinedInternal.logConfig()); +} else { +builder.withLoggingDisabled(); +} + +return builder; +} + +// This method has same code as Store.persistentWindowStore(). But TimeOrderedWindowStore is +// a non-public API, so we need to keep duplicate code until it becomes public. +private static WindowBytesStoreSupplier persistentTimeOrderedWindowStore(final String storeName, + final Duration retentionPeriod, + final Duration windowSize) { +Objects.requireNonNull(storeName, "name cannot be null"); +final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); +final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); +final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); +final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix); + +final long segmentInterval = Math.max(retentionMs / 2, 60_000L); + +if (retentionMs < 0L) { +throw new IllegalArgumentException("retentionPeriod cannot be negative"); +} +if (windowSizeMs < 0L) { +throw new IllegalArgumentException("windowSize cannot be negative"); +} +if (segmentInterval < 1L) { +throw new IllegalArgumentException("segmentInterval cannot be zero or negative"); +} +if (windowSizeMs > retentionMs) { +throw new IllegalArgumentException("The retention period of the window store " ++ storeName + " must be no smaller than its window size. Got size=[" ++ windowSizeMs + "], retention=[" + retentionMs + "]"); +} + +return new RocksDbWindowBytesStoreSupplier( +storeName, +retentionMs, +segmentInterval, +windowSizeMs, +false, Review comment: I had issues with duplicates, and forgot to investigate about it. I just did another round of investigation, but I still get issues with it. The problem is I cannot delete any key when duplicates are used. This happens in any window store, not just the time-ordered window store. The problem I found is: 1. Added two duplicates with key = 0 and time = 0 ``` # this adds a key with seqNum = 0 put(0, "A0", 0) # this adds a key with seqNum = 1 put(0, "A0-0", 0) ``` 2. Delete key = 0 and time = 0 ``` # this attempts to delete with seqNum = 2, which it does not exist put(0, null, 0) ``` Initially I didn't think using duplicates were necessary, but I just wrote a test case with the old semantics and duplicates are processed, so I need to support it. Do you know if deleting duplicates was unsupported all the time? or am I missing some API or workaround? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12656) JMX exporter is leaking a lot of file descriptors
Liang Xia created KAFKA-12656: - Summary: JMX exporter is leaking a lot of file descriptors Key: KAFKA-12656 URL: https://issues.apache.org/jira/browse/KAFKA-12656 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Liang Xia jmx exporter doesn't close the connections successfuly after reporting the metrics. They are stuck in CLOSE_WAIT state. java2351 kcbq *385u IPv63660408 0t0 TCP example.internal:9404->x.x.x.x:39470 (CLOSE_WAIT) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611863824 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -194,14 +196,45 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), -currentBatch.records(), +Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } +public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { +maybeCompleteDrain(); +ByteBuffer buffer = memoryPool.tryAllocate(256); +if (buffer != null) { +MemoryRecords data = MemoryRecords.withLeaderChangeMessage( +this.nextOffset, +currentTimeMs, +this.epoch, +buffer, +leaderChangeMessage); +completed.add(new CompletedBatch<>( +nextOffset, +Optional.empty(), +data, +memoryPool, +buffer +)); +nextOffset += 1; +} +} + +public void flush() { Review comment: @jsancio -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611859318 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -194,14 +196,45 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), -currentBatch.records(), +Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } +public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { +maybeCompleteDrain(); +ByteBuffer buffer = memoryPool.tryAllocate(256); +if (buffer != null) { +MemoryRecords data = MemoryRecords.withLeaderChangeMessage( +this.nextOffset, +currentTimeMs, +this.epoch, +buffer, +leaderChangeMessage); +completed.add(new CompletedBatch<>( +nextOffset, +Optional.empty(), +data, +memoryPool, +buffer +)); +nextOffset += 1; +} +} + +public void flush() { Review comment: https://github.com/apache/kafka/pull/10480#discussion_r610237424 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10506: MINOR: Improve description of `max.poll.records` config
hachikuji commented on a change in pull request #10506: URL: https://github.com/apache/kafka/pull/10506#discussion_r611840016 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ## @@ -66,7 +66,8 @@ /** max.poll.records */ public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records"; -private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."; +private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()." ++ " Note, that " + MAX_POLL_RECORDS_CONFIG + " does not impact the underlying fetching behavior."; Review comment: Maybe add one more sentence: > .. underlying fetch behavior. The consumer will cache the records from each Fetch request and return them incrementally from each `poll`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic to introduce LogLoader
kowshik commented on a change in pull request #10478: URL: https://github.com/apache/kafka/pull/10478#discussion_r611830920 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -255,19 +261,21 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { @threadsafe class Log(@volatile private var _dir: File, @volatile var config: LogConfig, + val segments: LogSegments, Review comment: Sounds good. I'll give this a shot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10888) Sticky partition leads to uneven product msg, resulting in abnormal delays in some partitions
[ https://issues.apache.org/jira/browse/KAFKA-10888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10888: -- Summary: Sticky partition leads to uneven product msg, resulting in abnormal delays in some partitions (was: Sticky partition leads to uneven product msg, resulting in abnormal delays in some partations) > Sticky partition leads to uneven product msg, resulting in abnormal delays > in some partitions > -- > > Key: KAFKA-10888 > URL: https://issues.apache.org/jira/browse/KAFKA-10888 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.4.1 >Reporter: jr >Priority: Major > Attachments: image-2020-12-24-21-05-02-800.png, > image-2020-12-24-21-09-47-692.png, image-2020-12-24-21-10-24-407.png > > > 110 producers ,550 partitions ,550 consumers , 5 nodes Kafka cluster > The producer uses the nullkey+stick partitioner, the total production rate > is about 100w tps > Observed partition delay is abnormal and message distribution is uneven, > which leads to the maximum production and consumption delay of the partition > with more messages > abnormal. > I cannot find reason that stick will make the message distribution uneven > at this production rate. > I can't switch to the round-robin partitioner, which will increase the > delay and cpu cost. Is thathe stick partationer design cause uneven message > distribution, or this is abnormal. How to solve it? > !image-2020-12-24-21-09-47-692.png! > As shown in the picture, the uneven distribution is concentrated on some > partitions and some brokers, there seems to be some rules. > This problem does not only occur in one cluster, but in many high tps > clusters, > The problem is more obvious on the test cluster we built. > !image-2020-12-24-21-10-24-407.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax opened a new pull request #10529: KAFKA-12650: fix NPE in InternalTopicManagerTest
mjsax opened a new pull request #10529: URL: https://github.com/apache/kafka/pull/10529 The NPE arrises, if we cannot setup topics quickly enough, and run into a timeout -- for this case, we try to call `admit.delete` (note that `admin` is mocked) that returns `null`. However, we should never actually "abort" the internal topic creation. -- Using `MockTime` instead of `SystemTime` should avoid this issue, as time won't advance at all. Or would there be any concern with regard to running forever? An alternative approach could be, to increase `max.poll.interval.ms` that defines the timeout. Call for review @bbejeck @ableegoldman -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
jsancio commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611787145 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -194,14 +196,45 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), -currentBatch.records(), +Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } +public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { +maybeCompleteDrain(); +ByteBuffer buffer = memoryPool.tryAllocate(256); +if (buffer != null) { +MemoryRecords data = MemoryRecords.withLeaderChangeMessage( +this.nextOffset, +currentTimeMs, +this.epoch, +buffer, +leaderChangeMessage); +completed.add(new CompletedBatch<>( +nextOffset, +Optional.empty(), +data, +memoryPool, +buffer +)); +nextOffset += 1; +} +} + +public void flush() { Review comment: I don't think we should expose this functionality. I think users of these type will always call `flush` after calling `appendLeaderChangeMessage`. If so why not do that implicitly in the method. ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -19,10 +19,14 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.mockito.Mockito; Review comment: This order of import doesn't match any of the styles used in Kafka. ## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ## @@ -36,30 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class LeaderStateTest { +public class LeaderStateTest { Review comment: Why is the `T` leaked to the tests? Glancing at the code the type parameter `T` is never used. Did you try changing the signature to: ``` private final BatchAccumulator accumulator = Mockito.mock(BatchAccumulator.class); ``` ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -194,14 +196,45 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), -currentBatch.records(), +Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } +public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { Review comment: This method is not safe. I think you need to hold a lock before falling `maybeCompleteDrain` and updating `nextOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611781359 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1876,12 +1836,12 @@ private void appendBatch( } private long maybeAppendBatches( -LeaderState state, +LeaderState state, long currentTimeMs ) { -long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs); +long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs); Review comment: There is an IO sync happening with the LeaderChangeMessage. That is the only time accumulator.flush is called. I think maybe this variable name can get changed to timeUntilDrain. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10478: KAFKA-12553: Refactor recovery logic to introduce LogLoader
satishd commented on a change in pull request #10478: URL: https://github.com/apache/kafka/pull/10478#discussion_r611789255 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -255,19 +261,21 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { @threadsafe class Log(@volatile private var _dir: File, @volatile var config: LogConfig, + val segments: LogSegments, Review comment: What I meant was we can have `Log` takes the argument as immutable `LogComponents` and it can initialize the vars inside `Log` with the respective fields from `LogComponents`. This will also set the right access of these vars by not giving write access by default. ``` class Log(@volatile private var _dir: File, @volatile var config: LogConfig, val segments: LogSegments, val logComponents: LogComponents, scheduler: Scheduler, brokerTopicStats: BrokerTopicStats, val time: Time, val maxProducerIdExpirationMs: Int, val producerIdExpirationCheckIntervalMs: Int, val topicPartition: TopicPartition, logDirFailureChannel: LogDirFailureChannel, @volatile var topicId: Option[Uuid], val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup { @volatile private var logStartOffset: Long = logComponents.logStartOffset @volatile private var recoveryPoint: Long = logComponents.recoveryPoint @volatile private var nextOffsetMetadata: LogOffsetMetadata = logComponents.nextOffsetMetadata @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = logComponents.leaderEpochCache private val producerStateManager: ProducerStateManager = logComponents.producerStateManager ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611781359 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1876,12 +1836,12 @@ private void appendBatch( } private long maybeAppendBatches( -LeaderState state, +LeaderState state, long currentTimeMs ) { -long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs); +long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs); Review comment: There is an IO sync happening with the LeaderChangeMessage. That is the only time accumulator.flush is called. I think maybe this variable name can get changed to timeUntilDrain. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611781359 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1876,12 +1836,12 @@ private void appendBatch( } private long maybeAppendBatches( -LeaderState state, +LeaderState state, long currentTimeMs ) { -long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs); +long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs); Review comment: There is an IO sync happening with the LeaderChangeMessage. That is the only time accumulator.flush is called. I think maybe this variable name here can get changed to timeUntilDrain? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611781359 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1876,12 +1836,12 @@ private void appendBatch( } private long maybeAppendBatches( -LeaderState state, +LeaderState state, long currentTimeMs ) { -long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs); +long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs); Review comment: There is an IO sync happening with the LeaderChangeMessage. That is the only time accumulator.flush is called. I think maybe this variable name can get changed to timeUntilDrain? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter
mjsax commented on pull request #10042: URL: https://github.com/apache/kafka/pull/10042#issuecomment-817951507 Thanks for the fix @MarcoLotz! -- Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter
mjsax merged pull request #10042: URL: https://github.com/apache/kafka/pull/10042 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12650) NPE in InternalTopicManager#cleanUpCreatedTopics
[ https://issues.apache.org/jira/browse/KAFKA-12650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319549#comment-17319549 ] Matthias J. Sax commented on KAFKA-12650: - [https://github.com/apache/kafka/pull/10042/checks?check_run_id=2319410471] > NPE in InternalTopicManager#cleanUpCreatedTopics > > > Key: KAFKA-12650 > URL: https://issues.apache.org/jira/browse/KAFKA-12650 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0 > > > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.cleanUpCreatedTopics(InternalTopicManager.java:675) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.maybeThrowTimeoutExceptionDuringSetup(InternalTopicManager.java:755) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.processCreateTopicResults(InternalTopicManager.java:652) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.setup(InternalTopicManager.java:599) > at > org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup(InternalTopicManagerTest.java:180) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12650) NPE in InternalTopicManager#cleanUpCreatedTopics
[ https://issues.apache.org/jira/browse/KAFKA-12650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-12650: --- Assignee: Matthias J. Sax > NPE in InternalTopicManager#cleanUpCreatedTopics > > > Key: KAFKA-12650 > URL: https://issues.apache.org/jira/browse/KAFKA-12650 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 3.0.0 > > > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.cleanUpCreatedTopics(InternalTopicManager.java:675) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.maybeThrowTimeoutExceptionDuringSetup(InternalTopicManager.java:755) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.processCreateTopicResults(InternalTopicManager.java:652) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.setup(InternalTopicManager.java:599) > at > org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldOnlyRetryNotSuccessfulFuturesDuringSetup(InternalTopicManagerTest.java:180) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319546#comment-17319546 ] Matthias J. Sax commented on KAFKA-9013: [https://github.com/apache/kafka/pull/10042/checks?check_run_id=2319756935] {quote}java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: The request timed out. at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365) at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:173) {quote} > Flaky Test MirrorConnectorsIntegrationTest#testReplication > -- > > Key: KAFKA-9013 > URL: https://issues.apache.org/jira/browse/KAFKA-9013 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > h1. Stacktrace: > {code:java} > java.lang.AssertionError: Condition not met within timeout 2. Offsets not > translated downstream to primary cluster. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239) > {code} > h1. Standard Error > {code} > Standard Error > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.RootResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. > Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors > WARNING: The following warnings have been detected: WARNING: The > (sub)resource method listLoggers in > org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectors in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method createConnector in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectorPlugins in > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > contains empty path annotation. > WARNING: The (sub)resource method serverInfo in > org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty > path annotation. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration
[jira] [Commented] (KAFKA-12629) Flaky Test RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319548#comment-17319548 ] Matthias J. Sax commented on KAFKA-12629: - Another timeout: [https://github.com/apache/kafka/pull/10042/checks?check_run_id=2319729123] > Flaky Test RaftClusterTest > -- > > Key: KAFKA-12629 > URL: https://issues.apache.org/jira/browse/KAFKA-12629 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > {quote} {{java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > org.apache.kafka.controller.NoOpSnapshotWriterBuilder > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
jsancio commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611772289 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1876,12 +1836,12 @@ private void appendBatch( } private long maybeAppendBatches( -LeaderState state, +LeaderState state, long currentTimeMs ) { -long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs); +long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs); Review comment: @hachikuji we had this discussion in the PR that introduced the `BatchAccumulator`. I was under the impression that we agreed to not use the word "flush" since in Java that word is usually used to represent some kind of IO sync. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState
dielhennr commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r610281681 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -202,6 +203,25 @@ private void completeCurrentBatch() { currentBatch = null; } +public void addControlBatch(MemoryRecords records) { +appendLock.lock(); +try { +drainStatus = DrainStatus.STARTED; +completed.add(new CompletedBatch<>( +nextOffset, +null, Review comment: Using `Optional.ofNullable` in the constructor saved the time of having to change every other instantiation of CompletedBatch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319521#comment-17319521 ] amuthan Ganeshan commented on KAFKA-12559: -- Thanks, [~ableegoldman], for the explanation; yep, it makes sense now. Let me give it a try and get back to you if I have further questions. > Add a top-level Streams config for bounding off-heap memory > --- > > Key: KAFKA-12559 > URL: https://issues.apache.org/jira/browse/KAFKA-12559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: amuthan Ganeshan >Priority: Major > Labels: needs-kip, newbie, newbie++ > > At the moment we provide an example of how to bound the memory usage of > rocskdb in the [Memory > Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb] > section of the docs. This requires implementing a custom RocksDBConfigSetter > class and setting a number of rocksdb options for relatively advanced > concepts and configurations. It seems a fair number of users either fail to > find this or consider it to be for more advanced use cases/users. But RocksDB > can eat up a lot of off-heap memory and it's not uncommon for users to come > across a {{RocksDBException: Cannot allocate memory}} > It would probably be a much better user experience if we implemented this > memory bound out-of-the-box and just gave users a top-level StreamsConfig to > tune the off-heap memory given to rocksdb, like we have for on-heap cache > memory with cache.max.bytes.buffering. More advanced users can continue to > fine-tune their memory bounding and apply other configs with a custom config > setter, while new or more casual users can cap on the off-heap memory without > getting their hands dirty with rocksdb. > I would propose to add the following top-level config: > rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid > values are [0, inf] > I'd also want to consider adding a second, lower priority top-level config to > give users a knob for adjusting how much of that total off-heap memory goes > to the block cache + index/filter blocks, and how much of it is afforded to > the write buffers. I'm struggling to come up with a good name for this > config, but it would be something like > rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default > to 0.5, valid values are [0, 1] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] amuthan Ganeshan reassigned KAFKA-12559: Assignee: amuthan Ganeshan > Add a top-level Streams config for bounding off-heap memory > --- > > Key: KAFKA-12559 > URL: https://issues.apache.org/jira/browse/KAFKA-12559 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: amuthan Ganeshan >Priority: Major > Labels: needs-kip, newbie, newbie++ > > At the moment we provide an example of how to bound the memory usage of > rocskdb in the [Memory > Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb] > section of the docs. This requires implementing a custom RocksDBConfigSetter > class and setting a number of rocksdb options for relatively advanced > concepts and configurations. It seems a fair number of users either fail to > find this or consider it to be for more advanced use cases/users. But RocksDB > can eat up a lot of off-heap memory and it's not uncommon for users to come > across a {{RocksDBException: Cannot allocate memory}} > It would probably be a much better user experience if we implemented this > memory bound out-of-the-box and just gave users a top-level StreamsConfig to > tune the off-heap memory given to rocksdb, like we have for on-heap cache > memory with cache.max.bytes.buffering. More advanced users can continue to > fine-tune their memory bounding and apply other configs with a custom config > setter, while new or more casual users can cap on the off-heap memory without > getting their hands dirty with rocksdb. > I would propose to add the following top-level config: > rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid > values are [0, inf] > I'd also want to consider adding a second, lower priority top-level config to > give users a knob for adjusting how much of that total off-heap memory goes > to the block cache + index/filter blocks, and how much of it is afforded to > the write buffers. I'm struggling to come up with a good name for this > config, but it would be something like > rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default > to 0.5, valid values are [0, 1] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jthurne commented on pull request #10491: MINOR: Switch to using the Gradle RAT plugin
jthurne commented on pull request #10491: URL: https://github.com/apache/kafka/pull/10491#issuecomment-817901760 Interesting. When I run the rat task locally, those three files are identified (correctly) as binary files and RAT skips them. I'm not sure why they are being detected as text files on CI. But an easy workaround is to explicitly exclude that directory. I've pushed up a commit that does just that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
C0urante commented on pull request #10528: URL: https://github.com/apache/kafka/pull/10528#issuecomment-817894272 @ncliang @gharris1727 @kpatelatwork @ddasarathan could one or two of you take a look at this when you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API
dajac commented on a change in pull request #10483: URL: https://github.com/apache/kafka/pull/10483#discussion_r611717669 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java ## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.TransactionDescription; +import org.apache.kafka.clients.admin.TransactionState; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.message.DescribeTransactionsRequestData; +import org.apache.kafka.common.message.DescribeTransactionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DescribeTransactionsRequest; +import org.apache.kafka.common.requests.DescribeTransactionsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.stream.Collectors; + +public class DescribeTransactionsHandler implements AdminApiHandler { +private final LogContext logContext; +private final Logger log; +private final Set keys; + +public DescribeTransactionsHandler( +Collection transactionalIds, +LogContext logContext +) { +this.keys = buildKeySet(transactionalIds); +this.log = logContext.logger(DescribeTransactionsHandler.class); +this.logContext = logContext; +} + +private static Set buildKeySet(Collection transactionalIds) { +return transactionalIds.stream() +.map(DescribeTransactionsHandler::asCoordinatorKey) +.collect(Collectors.toSet()); +} + +@Override +public String apiName() { +return "describeTransactions"; +} + +@Override +public Keys initializeKeys() { +return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext)); +} + +@Override +public DescribeTransactionsRequest.Builder buildRequest( +Integer brokerId, +Set keys +) { +DescribeTransactionsRequestData request = new DescribeTransactionsRequestData(); +List transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList()); +request.setTransactionalIds(transactionalIds); +return new DescribeTransactionsRequest.Builder(request); +} + +@Override +public ApiResult handleResponse( +Integer brokerId, +Set keys, +AbstractResponse abstractResponse +) { +DescribeTransactionsResponse response = (DescribeTransactionsResponse) abstractResponse; +Map completed = new HashMap<>(); +Map failed = new HashMap<>(); +List unmapped = new ArrayList<>(); + +for (DescribeTransactionsResponseData.TransactionState transactionState : response.data().transactionStates()) { +CoordinatorKey transactionalIdKey = asCoordinatorKey(transactionState.transactionalId()); +Errors error = Errors.forCode(transactionState.errorCode()); + +if (error != Errors.NONE) { +handleError(transactionalIdKey, error, failed, unmapped); +continue; +} + +OptionalLong transactionStartTimeMs = transactionState.transactionStartTimeMs() < 0 ? +OptionalLong.empty() : +OptionalLong.of(transactionState.transactionStartTimeMs()); + +completed.put(transactionalIdKey, new TransactionDescription( +brokerId, +TransactionState.parse(transactionState.transactionState()), +transactionState.producerId(), +transactionState.producerEpoch(), +
[GitHub] [kafka] dajac commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API
dajac commented on a change in pull request #10483: URL: https://github.com/apache/kafka/pull/10483#discussion_r611715337 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.requests.FindCoordinatorRequest; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +@InterfaceStability.Evolving +public class DescribeTransactionsResult { +private final Map> futures; + +DescribeTransactionsResult(Map> futures) { +this.futures = futures; +} + +public KafkaFuture transactionalIdResult(String transactionalId) { Review comment: `description` sounds good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits
C0urante opened a new pull request #10528: URL: https://github.com/apache/kafka/pull/10528 [Jira](https://issues.apache.org/jira/browse/KAFKA-12497) This change serves two purposes: 1. Eliminate unnecessary log messages for offset commit of tasks that don't need to perform offset commits (e.g., a task that has failed and for which all data has been flushed and committed) 2. Stop blocking the offset commit thread unnecessarily for flushes that will never succeed because the task's producer has failed to send a record in the current batch with a non-retriable error Existing unit tests for the `OffsetStorageWriter` are tweaked to verify the small change made to it. Several new unit tests are added for the `WorkerSourceTask` that cover various cases where offset commits should not be attempted, and some existing tests are modified to cover cases where offset commits either should or should not be attempted. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine merged pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during Connect task shutdown
kkonstantine merged pull request #10503: URL: https://github.com/apache/kafka/pull/10503 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-7249) Provide an official Docker Hub image for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-7249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timothy Higinbottom resolved KAFKA-7249. Resolution: Not A Problem > Provide an official Docker Hub image for Kafka > -- > > Key: KAFKA-7249 > URL: https://issues.apache.org/jira/browse/KAFKA-7249 > Project: Kafka > Issue Type: New Feature > Components: build, documentation, packaging, tools, website >Affects Versions: 1.0.1, 1.1.0, 1.1.1, 2.0.0 >Reporter: Timothy Higinbottom >Priority: Major > Labels: build, distribution, docker, packaging > > It would be great if there was an official Docker Hub image for Kafka, > supported by the Kafka community, so we knew that the image was trusted and > stable for use in production. Many organizations and teams are now using > Docker, Kubernetes, and other container systems that make deployment easier. > I think Kafka should move into this space and encourage this as an easy way > for beginners to get started, but also as a portable and effective way to > deploy Kafka in production. > > Currently there are only Kafka images maintained by third parties, which > seems like a shame for a big Apache project like Kafka. Hope you all consider > this. > > Thanks, > Tim -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12408) Document omitted ReplicaManager metrics
[ https://issues.apache.org/jira/browse/KAFKA-12408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley resolved KAFKA-12408. - Fix Version/s: 3.0.0 Reviewer: Tom Bentley Resolution: Fixed > Document omitted ReplicaManager metrics > --- > > Key: KAFKA-12408 > URL: https://issues.apache.org/jira/browse/KAFKA-12408 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Minor > Fix For: 3.0.0 > > > There are several problems in ReplicaManager metrics documentation: > * kafka.server:type=ReplicaManager,name=OfflineReplicaCount is omitted. > * kafka.server:type=ReplicaManager,name=FailedIsrUpdatesPerSec is omitted. > * kafka.server:type=ReplicaManager,name=[PartitionCount|LeaderCount]'s > descriptions are omitted: 'mostly even across brokers'. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #10527: Merge trunk 04/12
vvcephei commented on pull request #10527: URL: https://github.com/apache/kafka/pull/10527#issuecomment-817865515 Oy, just saw that, too. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org