[GitHub] [kafka] vvcephei merged pull request #8667: KAFKA-9994: Handle task migrated inside corruption path
vvcephei merged pull request #8667: URL: https://github.com/apache/kafka/pull/8667 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10000) Atomic commit of source connector records and offsets
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-1: -- Labels: needs-kip (was: ) > Atomic commit of source connector records and offsets > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > Labels: needs-kip > > It'd be nice to be able to configure source connectors such that their > offsets are committed if and only if all records up to that point have been > ack'd by the producer. This would go a long way towards EOS for source > connectors. > > This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is > marked as {{WONTFIX}} since it only concerns enabling the idempotent producer > for source connectors and is not concerned with source connector offsets. > This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, > which had a lot of discussion around allowing connector-defined transaction > boundaries. The suggestion in this ticket is to only use source connector > offset commits as the transaction boundaries for connectors; allowing > connector-specified transaction boundaries can be addressed separately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10000) Atomic commit of source connector records and offsets
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-1: -- Description: It'd be nice to be able to configure source connectors such that their offsets are committed if and only if all records up to that point have been ack'd by the producer. This would go a long way towards EOS for source connectors. This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is marked as {{WONTFIX}} since it only concerns enabling the idempotent producer for source connectors and is not concerned with source connector offsets. This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, which had a lot of discussion around allowing connector-defined transaction boundaries. The suggestion in this ticket is to only use source connector offset commits as the transaction boundaries for connectors; allowing connector-specified transaction boundaries can be addressed separately. was:It'd be nice to be able to configure source connectors such that their offsets are committed if and only if all records up to that point have been ack'd by the producer. This would go a long way towards EOS for source connectors. > Atomic commit of source connector records and offsets > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > > It'd be nice to be able to configure source connectors such that their > offsets are committed if and only if all records up to that point have been > ack'd by the producer. This would go a long way towards EOS for source > connectors. > > This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is > marked as {{WONTFIX}} since it only concerns enabling the idempotent producer > for source connectors and is not concerned with source connector offsets. > This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, > which had a lot of discussion around allowing connector-defined transaction > boundaries. The suggestion in this ticket is to only use source connector > offset commits as the transaction boundaries for connectors; allowing > connector-specified transaction boundaries can be addressed separately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10000) Atomic commit of source connector records and offsets
Chris Egerton created KAFKA-1: - Summary: Atomic commit of source connector records and offsets Key: KAFKA-1 URL: https://issues.apache.org/jira/browse/KAFKA-1 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Chris Egerton It'd be nice to be able to configure source connectors such that their offsets are committed if and only if all records up to that point have been ack'd by the producer. This would go a long way towards EOS for source connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby
ableegoldman commented on a change in pull request #8669: URL: https://github.com/apache/kafka/pull/8669#discussion_r425467074 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -198,433 +196,51 @@ public StateStore getStateStore(final String name) { @Override public void commit() { -task.requestCommit(); +throwUnsupportedOperationExceptionIfStandby("commit"); +applyStreamTaskOperation(StreamTask::requestCommit); } @Override @Deprecated public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) { +throwUnsupportedOperationExceptionIfStandby("schedule"); if (intervalMs < 1) { throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond."); } -return task.schedule(intervalMs, type, callback); +return returnStreamTaskOperation(t -> t.schedule(intervalMs, type, callback)); } @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this @Override public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException { +throwUnsupportedOperationExceptionIfStandby("schedule"); final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval"); return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback); } -private abstract static class StateStoreReadOnlyDecorator Review comment: I felt these were just cluttering up this class so I moved them to a new file This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8669: MINOR: consolidate processor context for active/standby
ableegoldman commented on pull request #8669: URL: https://github.com/apache/kafka/pull/8669#issuecomment-628915649 Call for review @cadonna @guozhangwang @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8669: MINOR: consolidate processor context for active/standby
ableegoldman opened a new pull request #8669: URL: https://github.com/apache/kafka/pull/8669 This is a prerequisite for [KAFKA-9501](https://github.com/apache/kafka/pull/8248/) and will also be useful for [KAFKA-9603](https://github.com/apache/kafka/pull/8661) There should be no logical changes here: the main difference is the removal of `StandbyContextImpl` in preparation for contexts to transition between active and standby. Also includes some minor cleanup, eg pulling the ReadOnly/ReadWrite decorators out into a separate file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9993) Think about inheritance in the protocol generation framework
[ https://issues.apache.org/jira/browse/KAFKA-9993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107726#comment-17107726 ] Boyang Chen commented on KAFKA-9993: Thanks [~vvcephei] An example is from the recent implementation for Raft: [https://github.com/confluentinc/kafka/pull/319/files#r424848803] a couple of common fields just as error code, leader id and later on cluster.id are attached to all the generated RPCs like vote, fetch, begin/end quorum, etc, but you have to manually extract them one by one without a common ancestor. It would be good to consolidate them. Similar cases for group coordinator and txn coordinator code are definitely possible to be simplified, as they separate share a lot of common fields for sure. > Think about inheritance in the protocol generation framework > > > Key: KAFKA-9993 > URL: https://issues.apache.org/jira/browse/KAFKA-9993 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > We have seen that there are a lot of common fields inside the > request/response templates that could be extracted as a super class for auto > generated classes. For example most response contains a top level error code. > Currently to build a service receiving multiple RPCs, the code template > produces a lot of redundant error code extraction logic which is far from > ideal. > What we want to discuss is whether to enable the general inheritance > mechanism in this framework, what's the trade-off and complexity increase, > and if there is any workaround just to make less boiler templates. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] OneCricketeer commented on pull request #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm
OneCricketeer commented on pull request #8489: URL: https://github.com/apache/kafka/pull/8489#issuecomment-628906219 Since Java 8 is EOL, why not upgrade to openjdk 11? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #8569: KIP-551: Expose disk read and write metrics
mumrah commented on a change in pull request #8569: URL: https://github.com/apache/kafka/pull/8569#discussion_r425443889 ## File path: core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala ## @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.nio.file.{Files, Paths} + +import org.apache.kafka.common.utils.Time +import org.slf4j.Logger + +import scala.jdk.CollectionConverters._ + +/** + * Retrieves Linux /proc/self/io metrics. + */ +class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logger) { + import LinuxIoMetricsCollector._ + var lastUpdateMs = -1L + var cachedReadBytes = 0L + var cachedWriteBytes = 0L + val path = Paths.get(procRoot, "self", "io") + + def readBytes(): Long = this.synchronized { +val curMs = time.milliseconds() Review comment: minor nit: could move the time check to the updateValues method since that's where lastUpdateMs is set? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions
rhauch commented on pull request #8618: URL: https://github.com/apache/kafka/pull/8618#issuecomment-628898153 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc removed a comment on pull request #8528: System tests should use --bootstrap-server rather than --zookeeper when testing new Kafka versions
d8tltanc removed a comment on pull request #8528: URL: https://github.com/apache/kafka/pull/8528#issuecomment-62030 https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3931/ Link for the system tests ⬆️ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8090: KAFKA-9537 - Cleanup error messages for abstract transformations
rhauch commented on pull request #8090: URL: https://github.com/apache/kafka/pull/8090#issuecomment-628895259 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty
mjsax merged pull request #8665: URL: https://github.com/apache/kafka/pull/8665 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty
mjsax commented on pull request #8665: URL: https://github.com/apache/kafka/pull/8665#issuecomment-628892193 Thanks for the fix @zhaohaidao! Merged to `trunk`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9984) Should fail the subscription when pattern is empty
[ https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9984. Fix Version/s: 2.6.0 Resolution: Fixed > Should fail the subscription when pattern is empty > -- > > Key: KAFKA-9984 > URL: https://issues.apache.org/jira/browse/KAFKA-9984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Assignee: HaiyuanZhao >Priority: Major > Fix For: 2.6.0 > > > We have seen a case where the consumer subscribes to an empty string pattern: > ``` > [Consumer ... ] Subscribed to pattern: '' > ``` > which doesn't make any sense and usually indicate a configuration error. The > `consumer.subscribe(pattern)` call should fail with illegal argument for this > case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
rhauch commented on pull request #8654: URL: https://github.com/apache/kafka/pull/8654#issuecomment-628885132 I've added some integration tests for creating the internal topic, including verifying some existing functionality w/r/t the replication factor and number of partitions with various Kafka cluster sizes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
[ https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107672#comment-17107672 ] Boyang Chen commented on KAFKA-9676: The current trunk has logic coverage for StandbyTaskCreator in TaskManagerTest and StreamThreadTest. I feel we don't necessarily need a dedicated test class for it. > Add test coverage for new ActiveTaskCreator and StandbyTaskCreator > -- > > Key: KAFKA-9676 > URL: https://issues.apache.org/jira/browse/KAFKA-9676 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Boyang Chen >Priority: Major > Labels: help-wanted, newbie > > The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit > test coverage. We should add corresponding tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
vvcephei commented on pull request #8662: URL: https://github.com/apache/kafka/pull/8662#issuecomment-628881057 Thanks, @ableegoldman ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
vvcephei merged pull request #8662: URL: https://github.com/apache/kafka/pull/8662 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
ableegoldman commented on pull request #8662: URL: https://github.com/apache/kafka/pull/8662#issuecomment-628879075 Java 14 build passed, Java 8 din't build due to ``` 11:29:13 # There is insufficient memory for the Java Runtime Environment to continue. 11:29:13 # Cannot create GC thread. Out of system resources. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9999) Topic description should be triggered after each failed topic creation iteration
Boyang Chen created KAFKA-: -- Summary: Topic description should be triggered after each failed topic creation iteration Key: KAFKA- URL: https://issues.apache.org/jira/browse/KAFKA- Project: Kafka Issue Type: Bug Components: admin, streams Affects Versions: 2.4.0 Reporter: Boyang Chen We spotted a case in system test failure where the topic already exists but the admin client still attempts to recreate it: {code:java} [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-uwin-cnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of partitions is unknown). Will retry to create this topic in 100 ms (to let broker finish async delete operation first). Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-cntByCnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics [SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog, SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made ready with 5 retries left (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. (org.apache.kafka.streams.processor.internals.InternalTopicManager) [2020-05-14 09:56:40,221] ERROR stream-thread [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread) org.apache.kafka.streams.errors.StreamsException: Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) at
[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107623#comment-17107623 ] Boyang Chen commented on KAFKA-9989: Discussed with [~cadonna] and it seems to be caused by some transient bug in the 441 assignor. Now the more general question is, whether we should account the scenario of an empty assignment at all in this test? Agree this helps catch assignor bugs like this, but still is this the right place for such a coverage? > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and skip the record processing validation when the assignment > is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-9989: -- Assignee: (was: Boyang Chen) > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and skip the record processing validation when the assignment > is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9998) KafkaProducer.close(timeout) still may block indefinitely
radai rosenblatt created KAFKA-9998: --- Summary: KafkaProducer.close(timeout) still may block indefinitely Key: KAFKA-9998 URL: https://issues.apache.org/jira/browse/KAFKA-9998 Project: Kafka Issue Type: Bug Affects Versions: 2.4.1 Reporter: radai rosenblatt looking at KafkaProducer.close(timeout), we have this: {code:java} private void close(Duration timeout, boolean swallowException) { long timeoutMs = timeout.toMillis(); if (timeoutMs < 0) throw new IllegalArgumentException("The timeout cannot be negative."); log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs); // this will keep track of the first encountered exception AtomicReference firstException = new AtomicReference<>(); boolean invokedFromCallback = Thread.currentThread() == this.ioThread; if (timeoutMs > 0) { if (invokedFromCallback) { log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " + "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeoutMs); } else { // Try to close gracefully. if (this.sender != null) this.sender.initiateClose(); if (this.ioThread != null) { try { this.ioThread.join(timeoutMs);< GRACEFUL JOIN } catch (InterruptedException t) { firstException.compareAndSet(null, new InterruptException(t)); log.error("Interrupted while joining ioThread", t); } } } } if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) { log.info("Proceeding to force close the producer since pending requests could not be completed " + "within timeout {} ms.", timeoutMs); this.sender.forceClose(); // Only join the sender thread when not calling from callback. if (!invokedFromCallback) { try { this.ioThread.join(); <- UNBOUNDED JOIN } catch (InterruptedException e) { firstException.compareAndSet(null, new InterruptException(e)); } } } ... } {code} specifically in our case the ioThread was running a (very) long running user-provided callback which was preventing the producer from closing within the given timeout. I think the 2nd join() call should either be _VERY_ short (since we're already past the timeout at that stage) ir should not happen at all. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8667: KAFKA-9994: Handle task migrated inside corruption path
vvcephei commented on a change in pull request #8667: URL: https://github.com/apache/kafka/pull/8667#discussion_r425386296 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -555,28 +555,35 @@ void runLoop() { } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + "Will close the task as dirty and re-create and bootstrap from scratch.", e); - -taskManager.commit( -taskManager.tasks() -.values() -.stream() -.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) -.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) -.collect(Collectors.toSet()) -); -taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); +try { +taskManager.commit( +taskManager.tasks() +.values() +.stream() +.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) +.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) +.collect(Collectors.toSet()) +); + taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); +} catch (final TaskMigratedException taskMigrated) { +handleTaskMigrated(taskMigrated); Review comment: Sounds legit. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107584#comment-17107584 ] Hai Lin commented on KAFKA-9987: Thanks for putting this up. From the above algorithm, looks like we will introduce a cap per of partition assigned per rebalance. Can you hight light a bit the difference from existing algorithm, the only difference is the new C_f and C_c? So the existing one will assign everything to fill up all unfilled_members and all partitions right? Also what's the time frame looks like for the implementation? Will it come with some benchmark to evaluate the performance with large consumer group(> 2k)? Will it come to 2.4 as a minor release? The reason we want it to upgrade from 2.2 to 2.4 is to get a better performance for large consumer group(> 2k), and this(stableness of large consumer group) is been a pain point for us for a while. > Improve sticky partition assignor algorithm > --- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > In > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > we added the new CooperativeStickyAssignor which leverages on the underlying > sticky assignment algorithm of the existing StickyAssignor (moved to > AbstractStickyAssignor). The algorithm is fairly complex as it tries to > optimize stickiness while satisfying perfect balance _in the case individual > consumers may be subscribed to different subsets of the topics._ While it > does a pretty good job at what it promises to do, it doesn't scale well with > large numbers of consumers and partitions. > To give a concrete example, users have reported that it takes 2.5 minutes for > the assignment to complete with just 2100 consumers reading from 2100 > partitions. Since partitions revoked during the first of two cooperative > rebalances will remain unassigned until the end of the second rebalance, it's > important for the rebalance to be as fast as possible. And since one of the > primary improvements of the cooperative rebalancing protocol is better > scaling experience, the only OOTB cooperative assignor should not itself > scale poorly > If we can constrain the problem a bit, we can simplify the algorithm greatly. > In many cases the individual consumers won't be subscribed to some random > subset of the total subscription, they will all be subscribed to the same set > of topics and rely on the assignor to balance the partition workload. > We can detect this case by checking the group's individual subscriptions and > call on a more efficient assignment algorithm. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107585#comment-17107585 ] Hai Lin commented on KAFKA-9987: Saw the PR, I will take a look. > Improve sticky partition assignor algorithm > --- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > In > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > we added the new CooperativeStickyAssignor which leverages on the underlying > sticky assignment algorithm of the existing StickyAssignor (moved to > AbstractStickyAssignor). The algorithm is fairly complex as it tries to > optimize stickiness while satisfying perfect balance _in the case individual > consumers may be subscribed to different subsets of the topics._ While it > does a pretty good job at what it promises to do, it doesn't scale well with > large numbers of consumers and partitions. > To give a concrete example, users have reported that it takes 2.5 minutes for > the assignment to complete with just 2100 consumers reading from 2100 > partitions. Since partitions revoked during the first of two cooperative > rebalances will remain unassigned until the end of the second rebalance, it's > important for the rebalance to be as fast as possible. And since one of the > primary improvements of the cooperative rebalancing protocol is better > scaling experience, the only OOTB cooperative assignor should not itself > scale poorly > If we can constrain the problem a bit, we can simplify the algorithm greatly. > In many cases the individual consumers won't be subscribed to some random > subset of the total subscription, they will all be subscribed to the same set > of topics and rely on the assignor to balance the partition workload. > We can detect this case by checking the group's individual subscriptions and > call on a more efficient assignment algorithm. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8667: KAFKA-9994: Handle task migrated inside corruption path
abbccdda commented on a change in pull request #8667: URL: https://github.com/apache/kafka/pull/8667#discussion_r425366150 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -555,28 +555,35 @@ void runLoop() { } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + "Will close the task as dirty and re-create and bootstrap from scratch.", e); - -taskManager.commit( -taskManager.tasks() -.values() -.stream() -.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) -.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) -.collect(Collectors.toSet()) -); -taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); +try { +taskManager.commit( +taskManager.tasks() +.values() +.stream() +.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) +.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) +.collect(Collectors.toSet()) +); + taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); +} catch (final TaskMigratedException taskMigrated) { +handleTaskMigrated(taskMigrated); Review comment: Good point, I was also thinking whether we should do the corruption logic no matter what. But if we hit a TaskMigrated, the `taskManager.handleLostAll` will wipe out all the task states dirty, which seems like a super-set of jobs for handleCorruption. If we failed the commit, maybe we should just skip the corruption logic? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #8222: KAFKA-9650: include human readable units in ms and bytes configs
mimaison merged pull request #8222: URL: https://github.com/apache/kafka/pull/8222 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case
ableegoldman opened a new pull request #8668: URL: https://github.com/apache/kafka/pull/8668 Motivation and pseudo code algorithm in the ticket. [WIP] still need to finish writing tests and gather rough benchmark results This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9997) upgrade log4j lib to address CVE-2020-9488
Emanuele Maccherani created KAFKA-9997: -- Summary: upgrade log4j lib to address CVE-2020-9488 Key: KAFKA-9997 URL: https://issues.apache.org/jira/browse/KAFKA-9997 Project: Kafka Issue Type: Bug Components: packaging Affects Versions: 2.5.0 Reporter: Emanuele Maccherani Kafka latest version is using log4j 1.2.17, which is affected by CVE-2020-9488. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8667: KAFKA-9994: Handle task migrated inside corruption path
vvcephei commented on a change in pull request #8667: URL: https://github.com/apache/kafka/pull/8667#discussion_r425348145 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -555,28 +555,35 @@ void runLoop() { } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " + "Will close the task as dirty and re-create and bootstrap from scratch.", e); - -taskManager.commit( -taskManager.tasks() -.values() -.stream() -.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) -.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) -.collect(Collectors.toSet()) -); -taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); +try { +taskManager.commit( +taskManager.tasks() +.values() +.stream() +.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) +.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id())) +.collect(Collectors.toSet()) +); + taskManager.handleCorruption(e.corruptedTaskWithChangelogs()); +} catch (final TaskMigratedException taskMigrated) { +handleTaskMigrated(taskMigrated); Review comment: Should we also handle the corrupted tasks here (before this line), so that they can be already cleaned up before the next round? Or, alternatively, should we move `taskManager.handleCorruption(e.corruptedTaskWithChangelogs());` to before the attempted commit (it looks like it could be outside the try block as well). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9996) upgrade zookeeper to 3.5.8 to address security vulnerabilities
Emanuele Maccherani created KAFKA-9996: -- Summary: upgrade zookeeper to 3.5.8 to address security vulnerabilities Key: KAFKA-9996 URL: https://issues.apache.org/jira/browse/KAFKA-9996 Project: Kafka Issue Type: Bug Components: packaging Affects Versions: 2.5.0 Reporter: Emanuele Maccherani Kafka is now using zookeeper 3.5.7, which is affected by CVE-2020-8840 and CVE-2020-9488. Those 2 are resolved in 3.5.8. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8558: KAFKA-8611 / KIP-221 documentation
mjsax commented on pull request #8558: URL: https://github.com/apache/kafka/pull/8558#issuecomment-628812878 Yes, on the voting thread: I proposed to also deprecate `through()` via KIP-221 and Guozhang replied. But it's your KIP and I don't want to "hijack" it :) -- I am willing to do the follow up PR, if you agree on the change (I can also update the KIP itself). But won't do it without your consensus :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
mjsax commented on pull request #8662: URL: https://github.com/apache/kafka/pull/8662#issuecomment-62881 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
mjsax commented on pull request #8662: URL: https://github.com/apache/kafka/pull/8662#issuecomment-628810936 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
guozhangwang commented on a change in pull request #8662: URL: https://github.com/apache/kafka/pull/8662#discussion_r425343094 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -1520,38 +1534,6 @@ private static void validateActiveTaskEncoding(final List partit } } -/** - * Internal helper function that creates a Kafka topic - * - * @param topicPartitions Map that contains the topic names to be created with the number of partitions - */ -private void prepareTopic(final Map topicPartitions) { -log.debug("Starting to validate internal topics {} in partition assignor.", topicPartitions); - -// first construct the topics to make ready -final Map topicsToMakeReady = new HashMap<>(); - -for (final InternalTopicConfig topic : topicPartitions.values()) { -final Optional numPartitions = topic.numberOfPartitions(); -if (!numPartitions.isPresent()) { -throw new StreamsException( -String.format("%sTopic [%s] number of partitions not defined", - logPrefix, topic.name()) -); -} -if (!topic.hasEnforcedNumberOfPartitions()) { -topic.setNumberOfPartitions(numPartitions.get()); -} -topicsToMakeReady.put(topic.name(), topic); -} Review comment: The logic indeed seem redundant to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
[ https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107523#comment-17107523 ] Matthias J. Sax commented on KAFKA-9676: [~bchen225242] There is still no `StandbyTaskCreatorTest` – IMHO we should keep this ticket open. > Add test coverage for new ActiveTaskCreator and StandbyTaskCreator > -- > > Key: KAFKA-9676 > URL: https://issues.apache.org/jira/browse/KAFKA-9676 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Boyang Chen >Priority: Major > Labels: help-wanted, newbie > > The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit > test coverage. We should add corresponding tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
vvcephei commented on a change in pull request #8662: URL: https://github.com/apache/kafka/pull/8662#discussion_r425330736 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -169,6 +173,9 @@ public void makeReady(final Map topics) { log.error(timeoutAndRetryError); throw new StreamsException(timeoutAndRetryError); } +log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); Review comment: Thanks! Will do. I just wanted to bounce the idea off you first, in case it was stupid. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8667: KAFKA-9994: Handle task migrated inside corruption path
mjsax commented on pull request #8667: URL: https://github.com/apache/kafka/pull/8667#issuecomment-628795711 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #8667: KAFKA-9994: Handle task migrated inside corruption path
abbccdda opened a new pull request #8667: URL: https://github.com/apache/kafka/pull/8667 The TaskMigratedException should always be non-fatal and caught within the run loop. Adding try-catch for corrupted path as well is necessary. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
ableegoldman commented on a change in pull request #8662: URL: https://github.com/apache/kafka/pull/8662#discussion_r425314470 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -169,6 +173,9 @@ public void makeReady(final Map topics) { log.error(timeoutAndRetryError); throw new StreamsException(timeoutAndRetryError); } +log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); Review comment: I do agree it would be useful though. Feel free to create a ticket :P This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8662: HOTFIX: skip listOffsets request for newly created changelog topics
ableegoldman commented on a change in pull request #8662: URL: https://github.com/apache/kafka/pull/8662#discussion_r425314263 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -169,6 +173,9 @@ public void makeReady(final Map topics) { log.error(timeoutAndRetryError); throw new StreamsException(timeoutAndRetryError); } +log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); Review comment: I think this race condition was particularly severe since we do the listOffsets request pretty much immediately after creating the topics, whereas whatever we're doing with that topic next will not be until the rebalance was completed. AFAIK we've never had any users report subsequent operations failing after the first rebalance due to not-yet-fully-created topics, but it could have just slipped past us This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty
mjsax commented on pull request #8665: URL: https://github.com/apache/kafka/pull/8665#issuecomment-628779775 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9993) Think about inheritance in the protocol generation framework
[ https://issues.apache.org/jira/browse/KAFKA-9993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107487#comment-17107487 ] John Roesler commented on KAFKA-9993: - Thanks for the idea, Boyang. Can you clarify whether the problem is boilerplate code inside the auto generated class, or boilerplate code that we have to write when using the generated classes? > Think about inheritance in the protocol generation framework > > > Key: KAFKA-9993 > URL: https://issues.apache.org/jira/browse/KAFKA-9993 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Major > > We have seen that there are a lot of common fields inside the > request/response templates that could be extracted as a super class for auto > generated classes. For example most response contains a top level error code. > Currently to build a service receiving multiple RPCs, the code template > produces a lot of redundant error code extraction logic which is far from > ideal. > What we want to discuss is whether to enable the general inheritance > mechanism in this framework, what's the trade-off and complexity increase, > and if there is any workaround just to make less boiler templates. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9995) IllegalStateException when offsets not found
James Hay created KAFKA-9995: Summary: IllegalStateException when offsets not found Key: KAFKA-9995 URL: https://issues.apache.org/jira/browse/KAFKA-9995 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.1 Reporter: James Hay I have a recently upgraded Kafka Streams 2.4.1 application and we have started seeing the application periodically crash due to the following error: {code:java} 2020-05-14T16:53:03.839Z DEBUG <> [chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2-consumer, groupId=chat] Fetching committed offsets for partitions: [private.chat.endpoint-0, public.chat.message-0] 2020-05-14T16:53:03.841Z INFO <> [chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2-consumer, groupId=chat] Found no committed offset for partition private.chat.endpoint-0 2020-05-14T16:53:03.842Z ERROR <> [chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] o.a.k.s.p.internals.StreamThread - stream-thread [chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] Encountered the following error during processing: java.lang.IllegalStateException: Offset limit should monotonically increase, but was reduced. New limit: 0. Previous limit: 857859at org.apache.kafka.streams.processor.internals.StandbyTask.updateOffsetLimits(StandbyTask.java:215) at org.apache.kafka.streams.processor.internals.StandbyTask.update(StandbyTask.java:181) at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1048) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:825) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) {code} Is this a known issue? What can cause offsets to not be found? Other things to note: * We have TOPOLOGY_OPTIMIZATION set to OPTIMIZE * We are only seeing this on two of our three environments (at the moment). The two environments we are seeing a problem have little traffic and only seems to impact the machines that are on the whole idle. Our prod environment which consumes regular events is showing no signs of having the same problem. * There is some evidence to suggest there is a pattern to the timing of this error. Although not always the case, 24hrs between errors is common. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path
[ https://issues.apache.org/jira/browse/KAFKA-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-9994: --- Description: We have seen a case where the TaskMigrated exception gets thrown from taskManager.commit(). This should be prevented by proper catching. Looking at the stack trace, the TaskMigrated was thrown from preCommit() call inside corrupted task exception commit. {code:java} [2020-05-14T05:47:25-07:00] (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 12:47:25,635] ERROR [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] stream-thread [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) [2020-05-14T05:47:25-07:00] (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced trying to send a record [stream-thread [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] task [1_1]]; it means all tasks belonging to this thread should be migrated. at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:216) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171) at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69) at org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore.log(ChangeLoggingTimestampedWindowBytesStore.java:36) at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:112) at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34) at org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:111) at org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:91) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:296) at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84) at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$flush$4(MeteredWindowStore.java:200) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:200) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:402) at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:317) at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:753) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:573) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517) {code} was:We have seen a case where the TaskMigrated exception gets thrown from taskManager.commit(). This should be prevented by proper catching. > Catch TaskMigrated exception in task corruption code path > -- > > Key: KAFKA-9994 > URL: https://issues.apache.org/jira/browse/KAFKA-9994 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Boyang Chen >Priority: Major > > We have seen a case where the TaskMigrated exception gets thrown from > taskManager.commit(). This should be prevented by proper catching. > Looking at the stack trace, the TaskMigrated was thrown from preCommit() call > inside corrupted task exception commit. > {code:java} > [2020-05-14T05:47:25-07:00] > (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 > 12:47:25,635] ERROR > [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] > stream-thread > [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-14T05:47:25-07:00] > (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) >
[jira] [Created] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path
Boyang Chen created KAFKA-9994: -- Summary: Catch TaskMigrated exception in task corruption code path Key: KAFKA-9994 URL: https://issues.apache.org/jira/browse/KAFKA-9994 Project: Kafka Issue Type: Bug Components: streams Reporter: Boyang Chen We have seen a case where the TaskMigrated exception gets thrown from taskManager.commit(). This should be prevented by proper catching. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9993) Think about inheritance in the protocol generation framework
Boyang Chen created KAFKA-9993: -- Summary: Think about inheritance in the protocol generation framework Key: KAFKA-9993 URL: https://issues.apache.org/jira/browse/KAFKA-9993 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen We have seen that there are a lot of common fields inside the request/response templates that could be extracted as a super class for auto generated classes. For example most response contains a top level error code. Currently to build a service receiving multiple RPCs, the code template produces a lot of redundant error code extraction logic which is far from ideal. What we want to discuss is whether to enable the general inheritance mechanism in this framework, what's the trade-off and complexity increase, and if there is any workaround just to make less boiler templates. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #8222: KAFKA-9650: include human readable units in ms and bytes configs
mimaison commented on pull request #8222: URL: https://github.com/apache/kafka/pull/8222#issuecomment-628745671 Tests passed locally and the generated docs looked good, merging to trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffhuang26 commented on pull request #7898: KAFKA-9366: please consider upgrade log4j to log4j2 due to critical security problem CVE-2019-17571
jeffhuang26 commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-628732206 What is timeline for merging this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8222: KAFKA-9650: include human readable units in ms and bytes configs
mimaison commented on pull request #8222: URL: https://github.com/apache/kafka/pull/8222#issuecomment-628718604 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9992) EmbeddedKafkaCluster not working with kafka_2.13
[ https://issues.apache.org/jira/browse/KAFKA-9992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107394#comment-17107394 ] John Roesler commented on KAFKA-9992: - Hello Andras, Just to make sure I understand, is this just a problem with our test artifact? The test artifacts are not public APIs, and I would not recommend depending on them. If you want to submit a PR for your patch we can merge it, but I don’t think we can call this a bug. Note that there have been a few conversations in Jira and on the mailing list about actually creating a public EmbeddedKafkaCluster for testing. So far, no one has picked it up, though. If you’re interested in doing that, it would be appreciated, and I can help with the KIP process. My standard advice is to copy/paste the EmbeddedKafkaCluster into your own test module so that you don’t depend on upstream tests. > EmbeddedKafkaCluster not working with kafka_2.13 > > > Key: KAFKA-9992 > URL: https://issues.apache.org/jira/browse/KAFKA-9992 > Project: Kafka > Issue Type: Bug > Components: packaging, streams >Affects Versions: 2.4.1 >Reporter: Andras Katona >Priority: Major > > Kafka Streams artifact is depending on kafka_2.12 as of now, it is in the > [kafka-streams-2.4.1.pom|https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams/2.4.1/kafka-streams-2.4.1.pom]: > {code} > > org.apache.kafka > kafka_2.12 > 2.4.1 > test > > {code} > But it is not hardcoded, whatever scala version was used to compile this > component before uploading, that will be present in the pom. > When I'm using these deps: > {code} > > org.apache.kafka > kafka-streams > 2.4.1 > test > test > > > org.apache.kafka > kafka_2.13 > 2.4.1 > test > test > > {code} > My test fails with the following exception (deleteTopicAndWait is called in > my @After method): > {noformat} > java.lang.NoSuchMethodError: > scala.collection.JavaConverters.setAsJavaSetConverter(Lscala/collection/Set;)Lscala/collection/convert/Decorators$AsJava; > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster$TopicsDeletedCondition.conditionMet(EmbeddedKafkaCluster.java:316) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:370) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:266) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicAndWait(EmbeddedKafkaCluster.java:221) > {noformat} > I modified kafka build locally to separate artifacts based on scala version > just like it is done with kafka core, and I pulled in kafka-streams_2.13 from > my local mvn repo and test was working again. > I was only trying with 2.4.1, but I'm assuming other versions are also > affected, please add the proper versions and proper components too (in case > it's not packaging). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9479) Describe consumer group --all-groups shows header for each entry
[ https://issues.apache.org/jira/browse/KAFKA-9479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107374#comment-17107374 ] Vince Mu commented on KAFKA-9479: - PR created [https://github.com/apache/kafka/pull/8666] > Describe consumer group --all-groups shows header for each entry > > > Key: KAFKA-9479 > URL: https://issues.apache.org/jira/browse/KAFKA-9479 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jeff Kim >Priority: Major > Labels: newbie > > When using `bin/kafka-consumer-groups.sh --describe --state --all-groups`, we > print output like the following: > {code} > GROUP COORDINATOR (ID) > ASSIGNMENT-STRATEGY STATE #MEMBERS > group1 localhost:9092 (3) rangeStable 1 > > > GROUP COORDINATOR (ID) > ASSIGNMENT-STRATEGY STATE #MEMBERS > group2 localhost:9092 (3) rangeStable 1 > > > {code} > It would be nice if we did not show the header for every entry. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] VinceMu opened a new pull request #8666: KAFKA-9479 Describe consumer group --state --all-groups show header once
VinceMu opened a new pull request #8666: URL: https://github.com/apache/kafka/pull/8666 Used the [previous PR ](https://github.com/apache/kafka/pull/8096) made by vetler as a starting point. Updated the printState() method in ConsumerGroup command to only print the header once when the following options are set `--describe --state --all-groups`. Modified testDescribeAllExistingGroups test in DescribeConsumerGroupTest so that we take into account the case where we only print the header once. In this case number of lines is equal to the length of DescribeTypes + 1. ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
rhauch commented on pull request #8654: URL: https://github.com/apache/kafka/pull/8654#issuecomment-628682601 BTW, I've improved the error message when Connect's `TopicAdmin` fails to create a topic because of an unknown topic setting: > Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:297) org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 'mm2-offsets.backup.internal': Unknown topic config name: topic.replication.factor Here's what this looks like in the log message just before the herder exits: ``` [2020-05-14 09:36:22,348] ERROR [Worker clientId=connect-2, groupId=backup-mm2] Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:297) org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 'mm2-offsets.backup.internal': Unknown topic config name: topic.replication.factor at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:305) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:105) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:128) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:115) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:186) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:123) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:284) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic config name: topic.replication.factor ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #6229: KAFKA-6786: Removing additional configs for StreamsBrokerDownResilienceTest
bbejeck commented on pull request #6229: URL: https://github.com/apache/kafka/pull/6229#issuecomment-628670456 @sh-abhi apologies for letting this slip. Can you rebase this PR? Then we can get this merged. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
vvcephei merged pull request #8588: URL: https://github.com/apache/kafka/pull/8588 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance
vvcephei commented on pull request #8588: URL: https://github.com/apache/kafka/pull/8588#issuecomment-628635918 The test failure was unrelated: kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mmolimar commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ
mmolimar commented on a change in pull request #8663: URL: https://github.com/apache/kafka/pull/8663#discussion_r425135341 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ## @@ -96,6 +101,25 @@ public static void validate(Map props) { throw new ConfigException("Must configure one of " + SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG); } + +if (hasDlqTopicConfig) { +String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim(); +if (hasTopicsConfig) { +List topics = parseTopicsList(props); +if (topics.contains(dlqTopic)) { +throw new ConfigException(DLQ_TOPIC_NAME_CONFIG + " has a topic name which is already in " + Review comment: There will be just one topic in the DQL topic config. We could add it but I'm not sure if it's explicitly necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9992) EmbeddedKafkaCluster not working with kafka_2.13
[ https://issues.apache.org/jira/browse/KAFKA-9992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andras Katona updated KAFKA-9992: - Summary: EmbeddedKafkaCluster not working with kafka_2.13 (was: EmbeddedKafka not working with kafka_2.13) > EmbeddedKafkaCluster not working with kafka_2.13 > > > Key: KAFKA-9992 > URL: https://issues.apache.org/jira/browse/KAFKA-9992 > Project: Kafka > Issue Type: Bug > Components: packaging, streams >Affects Versions: 2.4.1 >Reporter: Andras Katona >Priority: Major > > Kafka Streams artifact is depending on kafka_2.12 as of now, it is in the > [kafka-streams-2.4.1.pom|https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams/2.4.1/kafka-streams-2.4.1.pom]: > {code} > > org.apache.kafka > kafka_2.12 > 2.4.1 > test > > {code} > But it is not hardcoded, whatever scala version was used to compile this > component before uploading, that will be present in the pom. > When I'm using these deps: > {code} > > org.apache.kafka > kafka-streams > 2.4.1 > test > test > > > org.apache.kafka > kafka_2.13 > 2.4.1 > test > test > > {code} > My test fails with the following exception (deleteTopicAndWait is called in > my @After method): > {noformat} > java.lang.NoSuchMethodError: > scala.collection.JavaConverters.setAsJavaSetConverter(Lscala/collection/Set;)Lscala/collection/convert/Decorators$AsJava; > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster$TopicsDeletedCondition.conditionMet(EmbeddedKafkaCluster.java:316) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:370) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:266) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicAndWait(EmbeddedKafkaCluster.java:221) > {noformat} > I modified kafka build locally to separate artifacts based on scala version > just like it is done with kafka core, and I pulled in kafka-streams_2.13 from > my local mvn repo and test was working again. > I was only trying with 2.4.1, but I'm assuming other versions are also > affected, please add the proper versions and proper components too (in case > it's not packaging). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9992) EmbeddedKafka not working with kafka_2.13
Andras Katona created KAFKA-9992: Summary: EmbeddedKafka not working with kafka_2.13 Key: KAFKA-9992 URL: https://issues.apache.org/jira/browse/KAFKA-9992 Project: Kafka Issue Type: Bug Components: packaging, streams Affects Versions: 2.4.1 Reporter: Andras Katona Kafka Streams artifact is depending on kafka_2.12 as of now, it is in the [kafka-streams-2.4.1.pom|https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams/2.4.1/kafka-streams-2.4.1.pom]: {code} org.apache.kafka kafka_2.12 2.4.1 test {code} But it is not hardcoded, whatever scala version was used to compile this component before uploading, that will be present in the pom. When I'm using these deps: {code} org.apache.kafka kafka-streams 2.4.1 test test org.apache.kafka kafka_2.13 2.4.1 test test {code} My test fails with the following exception (deleteTopicAndWait is called in my @After method): {noformat} java.lang.NoSuchMethodError: scala.collection.JavaConverters.setAsJavaSetConverter(Lscala/collection/Set;)Lscala/collection/convert/Decorators$AsJava; at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster$TopicsDeletedCondition.conditionMet(EmbeddedKafkaCluster.java:316) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:370) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:266) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicAndWait(EmbeddedKafkaCluster.java:221) {noformat} I modified kafka build locally to separate artifacts based on scala version just like it is done with kafka core, and I pulled in kafka-streams_2.13 from my local mvn repo and test was working again. I was only trying with 2.4.1, but I'm assuming other versions are also affected, please add the proper versions and proper components too (in case it's not packaging). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mmolimar commented on a change in pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ
mmolimar commented on a change in pull request #8663: URL: https://github.com/apache/kafka/pull/8663#discussion_r425115475 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ## @@ -108,6 +132,20 @@ public static boolean hasTopicsRegexConfig(Map props) { return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty(); } +public static boolean hasDlqTopicConfig(Map props) { +String dqlTopicStr = props.get(DLQ_TOPIC_NAME_CONFIG); +return dqlTopicStr != null && !dqlTopicStr.trim().isEmpty(); +} + +public static List parseTopicsList(Map props) { +List topics = (List) ConfigDef.parseType(TOPICS_CONFIG, props.get(TOPICS_CONFIG), Type.LIST); +return topics +.stream() +.filter(topic -> !topic.isEmpty()) +.distinct() Review comment: That what I thought but we would also have a topic name with an empty string. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a change in pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty
zhaohaidao commented on a change in pull request #8665: URL: https://github.com/apache/kafka/pull/8665#discussion_r425097747 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -1015,8 +1015,8 @@ public void subscribe(Collection topics) { @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { maybeThrowInvalidGroupIdException(); -if (pattern == null) -throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); +if (pattern == null || pattern.toString().equals("")) +throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + pattern == null ? "null" : "empty"); Review comment: Thanks. I have fixed this issue. Could you continue to review it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc edited a comment on pull request #8528: System tests should use --bootstrap-server rather than --zookeeper when testing new Kafka versions
d8tltanc edited a comment on pull request #8528: URL: https://github.com/apache/kafka/pull/8528#issuecomment-628340161 Latest dev branch builds here (REMOVE_SYSTEM_4): https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3944/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #8622: MINOR: Update stream documentation
showuon edited a comment on pull request #8622: URL: https://github.com/apache/kafka/pull/8622#issuecomment-628563253 Hi @bbejeck , I appended more fixs for the streams documents. Please review it when available. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #8622: MINOR: Update stream documentation
showuon edited a comment on pull request #8622: URL: https://github.com/apache/kafka/pull/8622#issuecomment-628563253 Hi @bbejeck , I appended more fixs for the streams documents when I read it. Please review it when available. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8622: MINOR: Update stream documentation
showuon commented on pull request #8622: URL: https://github.com/apache/kafka/pull/8622#issuecomment-628563253 Hi @bbejeck , I appended more fixs for the streams documents. Please review it when available. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r425055023 ## File path: docs/streams/developer-guide/write-streams.html ## @@ -208,7 +208,7 @@ Using Kafka Streams within your application code - Testing a Streams application + Testing a Streams application Review comment: wrong HTML formatting. The starting/ending `` tag and `` tag are mixed places. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r425055442 ## File path: docs/streams/developer-guide/running-app.html ## @@ -51,7 +51,7 @@ Running Streams Applications You can run Java applications that use the Kafka Streams library without any additional configuration or requirements. Kafka Streams also provides the ability to receive notification of the various states of the application. The ability to monitor the runtime - status is discussed in the monitoring guide. + status is discussed in the monitoring guide. Review comment: Fix the broken link. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r425055023 ## File path: docs/streams/developer-guide/write-streams.html ## @@ -208,7 +208,7 @@ Using Kafka Streams within your application code - Testing a Streams application + Testing a Streams application Review comment: wrong HTML formatting. `` tag and `` tag are mixed places. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r425054711 ## File path: docs/streams/developer-guide/write-streams.html ## @@ -154,7 +154,7 @@ Using Kafka Streams within your application codeStream Partitions and Tasks and Threading Model. +For more information, see Stream Partitions and Tasks and Threading Model. Review comment: fix the broken link from wrong `href="../architecture.html#streams-architecture-threads"` to `href="../architecture.html#streams_architecture_threads"` (the one with underscore) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8622: MINOR: Update stream documentation
showuon commented on a change in pull request #8622: URL: https://github.com/apache/kafka/pull/8622#discussion_r425054297 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3738,7 +3738,7 @@ KTable-KTable Foreign-Key -Testing a Streams application +Testing a Streams application Review comment: Currently, the `TESTING A STREAMS APPLICATION` header is in blue, which is different from others. Turns out that it's because we put `` tag inside ``, and cause to apply the wrong css format. ![image](https://user-images.githubusercontent.com/43372967/81927117-b1d18780-9615-11ea-979b-db48c294e742.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8665: KAFKA-9984 Should fail the subscription when pattern is empty
chia7712 commented on a change in pull request #8665: URL: https://github.com/apache/kafka/pull/8665#discussion_r425001861 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -1015,8 +1015,8 @@ public void subscribe(Collection topics) { @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { maybeThrowInvalidGroupIdException(); -if (pattern == null) -throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); +if (pattern == null || pattern.toString().equals("")) +throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + pattern == null ? "null" : "empty"); Review comment: the condition is always false if you don't add brackets. ``` throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty")); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-7965. Resolution: Fixed > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 1.1.1, 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: David Jacot >Priority: Critical > Labels: flaky-test > Fix For: 2.6.0 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107107#comment-17107107 ] David Jacot commented on KAFKA-7965: Closing it. Please re-open it if needed. > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 1.1.1, 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: David Jacot >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-7965: --- Fix Version/s: (was: 2.3.0) 2.6.0 > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 1.1.1, 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: David Jacot >Priority: Critical > Labels: flaky-test > Fix For: 2.6.0 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #8254: KIP-557: Add Emit On Change Support
cadonna commented on a change in pull request #8254: URL: https://github.com/apache/kafka/pull/8254#discussion_r424984599 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) { keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } -} \ No newline at end of file + +public RawAndDeserializedValue getWithBinary(final K key) { +try { +return maybeMeasureLatency(() -> { +final byte[] serializedValue = wrapped().get(keyBytes(key)); +return new RawAndDeserializedValue(serializedValue, outerValue(serializedValue)); +}, time, getSensor); +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key); +throw new ProcessorStateException(message, e); +} +} + +public boolean putIfDifferentValues(final K key, +final ValueAndTimestamp newValue, +final byte[] oldSerializedValue) { +try { +return maybeMeasureLatency( +() -> { +final byte[] newSerializedValue = serdes.rawValue(newValue); +if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) { +return false; +} else { +wrapped().put(keyBytes(key), newSerializedValue); Review comment: > Don't we still want to put the value in the store (even if we don't forward it on to the next context) if the values are the same but the timestamp is newer? If we just put the value in the store but did not forward it, then the store would actually be corrupted, because the local state would not be consistent with downstream anymore. Not putting a record with the same value but a newer timestamp in the store and not forwarding it was the main point of this KIP. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8254: KIP-557: Add Emit On Change Support
cadonna commented on a change in pull request #8254: URL: https://github.com/apache/kafka/pull/8254#discussion_r424984599 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) { keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } -} \ No newline at end of file + +public RawAndDeserializedValue getWithBinary(final K key) { +try { +return maybeMeasureLatency(() -> { +final byte[] serializedValue = wrapped().get(keyBytes(key)); +return new RawAndDeserializedValue(serializedValue, outerValue(serializedValue)); +}, time, getSensor); +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key); +throw new ProcessorStateException(message, e); +} +} + +public boolean putIfDifferentValues(final K key, +final ValueAndTimestamp newValue, +final byte[] oldSerializedValue) { +try { +return maybeMeasureLatency( +() -> { +final byte[] newSerializedValue = serdes.rawValue(newValue); +if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) { +return false; +} else { +wrapped().put(keyBytes(key), newSerializedValue); Review comment: > Don't we still want to put the value in the store (even if we don't forward it on to the next context) if the values are the same but the timestamp is newer? If we just put the value in the store but did not forward it, then the store would actually be corrupted, because the local state would not be consistent with downstream anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8254: KIP-557: Add Emit On Change Support
cadonna commented on a change in pull request #8254: URL: https://github.com/apache/kafka/pull/8254#discussion_r424971223 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) { keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } -} \ No newline at end of file + +public RawAndDeserializedValue getWithBinary(final K key) { +try { +return maybeMeasureLatency(() -> { +final byte[] serializedValue = wrapped().get(keyBytes(key)); +return new RawAndDeserializedValue(serializedValue, outerValue(serializedValue)); +}, time, getSensor); +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key); +throw new ProcessorStateException(message, e); +} +} + +public boolean putIfDifferentValues(final K key, +final ValueAndTimestamp newValue, +final byte[] oldSerializedValue) { +try { +return maybeMeasureLatency( +() -> { +final byte[] newSerializedValue = serdes.rawValue(newValue); +if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) { +return false; +} else { +wrapped().put(keyBytes(key), newSerializedValue); Review comment: > Why when the timestamp of the newer value is lower, do we want to put the new value into the store? Surely the store should have the value with the newer timestamp? Otherwise we could wind up with a corrupt store. This behavior was there also before this PR. If a out-of-order record is encountered, a log message was written, but the record was nevertheless put into the state store (cf. https://github.com/apache/kafka/blob/7624e6247984223901aa34d7b7c2789c3e1d0c3f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L122). The only thing that changed is that if the serialized value of the new record is equal to the serialized value of the old value and the timestamp of the new record is equal or newer, we drop the record because it is a idempotent update. Could you elaborate on why a store should get corrupted because of this? > would result in the table containing K: X, V: B, which is wrong. As said above, this behavior should not have been changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8254: KIP-557: Add Emit On Change Support
cadonna commented on a change in pull request #8254: URL: https://github.com/apache/kafka/pull/8254#discussion_r424971223 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) { keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } -} \ No newline at end of file + +public RawAndDeserializedValue getWithBinary(final K key) { +try { +return maybeMeasureLatency(() -> { +final byte[] serializedValue = wrapped().get(keyBytes(key)); +return new RawAndDeserializedValue(serializedValue, outerValue(serializedValue)); +}, time, getSensor); +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key); +throw new ProcessorStateException(message, e); +} +} + +public boolean putIfDifferentValues(final K key, +final ValueAndTimestamp newValue, +final byte[] oldSerializedValue) { +try { +return maybeMeasureLatency( +() -> { +final byte[] newSerializedValue = serdes.rawValue(newValue); +if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) { +return false; +} else { +wrapped().put(keyBytes(key), newSerializedValue); Review comment: > Why when the timestamp of the newer value is lower, do we want to put the new value into the store? Surely the store should have the value with the newer timestamp? Otherwise we could wind up with a corrupt store. This behavior was there also before this PR. If a out-of-order record is encountered, a log message was written, but the record was nevertheless put into the state store (cf. https://github.com/apache/kafka/blob/7624e6247984223901aa34d7b7c2789c3e1d0c3f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L122). The only thing that changed is that if the serialized value of the new record is equal to the serialized value of the old value and the timestamp of the new record is equal or newer, we drop the record because it is a idempotent update. Could you elaborate on why a store should get corrupted because of this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #8254: KIP-557: Add Emit On Change Support
rodesai commented on a change in pull request #8254: URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) { keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } -} \ No newline at end of file + +public RawAndDeserializedValue getWithBinary(final K key) { +try { +return maybeMeasureLatency(() -> { +final byte[] serializedValue = wrapped().get(keyBytes(key)); +return new RawAndDeserializedValue(serializedValue, outerValue(serializedValue)); +}, time, getSensor); +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key); +throw new ProcessorStateException(message, e); +} +} + +public boolean putIfDifferentValues(final K key, +final ValueAndTimestamp newValue, +final byte[] oldSerializedValue) { +try { +return maybeMeasureLatency( +() -> { +final byte[] newSerializedValue = serdes.rawValue(newValue); +if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) { +return false; +} else { +wrapped().put(keyBytes(key), newSerializedValue); Review comment: @ConcurrencyPractitioner @vvcephei I'm trying to understand this to debug some broken tests in ksql. Couple questions: When the timestamp of the newer value is lower (ignoring the value), why do we want to put the new value into the store? Surely the store should have the value with the newer timestamp? Otherwise we could wind up with a corrupt store. Don't we still want to put the value in the store (even if we don't forward it on to the next context) if the values are the same but the timestamp is newer? Otherwise if we get an out-of-order update with a different value, but a timestamp in between the rows with the same value, we'd incorrectly put that value into the store, e.g. the following updates: TS: 1, K: X, V: A TS: 3, K: X, V: A TS: 2, K: X, V: B would result in the table containing `K: X, V: B`, which is wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #8254: KIP-557: Add Emit On Change Support
rodesai commented on a change in pull request #8254: URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) { keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } -} \ No newline at end of file + +public RawAndDeserializedValue getWithBinary(final K key) { +try { +return maybeMeasureLatency(() -> { +final byte[] serializedValue = wrapped().get(keyBytes(key)); +return new RawAndDeserializedValue(serializedValue, outerValue(serializedValue)); +}, time, getSensor); +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key); +throw new ProcessorStateException(message, e); +} +} + +public boolean putIfDifferentValues(final K key, +final ValueAndTimestamp newValue, +final byte[] oldSerializedValue) { +try { +return maybeMeasureLatency( +() -> { +final byte[] newSerializedValue = serdes.rawValue(newValue); +if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) { +return false; +} else { +wrapped().put(keyBytes(key), newSerializedValue); Review comment: @ConcurrencyPractitioner @vvcephei I'm trying to understand this to debug some broken tests in ksql. Couple questions: Why when the timestamp of the newer value is lower, do we want to put the new value into the store? Surely the store should have the value with the newer timestamp? Otherwise we could wind up with a corrupt store. Don't we still want to put the value in the store if the values are the same but the timestamp is newer? Otherwise if we get an out-of-order update with a different value, but a timestamp in between the rows with the same value, we'd incorrectly put that value into the store, e.g. the following updates: TS: 1, K: X, V: A TS: 3, K: X, V: A TS: 2, K: X, V: B would result in the table containing `K: X, V: B`, which is wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on pull request #8558: KAFKA-8611 / KIP-221 documentation
lkokhreidze commented on pull request #8558: URL: https://github.com/apache/kafka/pull/8558#issuecomment-628419029 Hi @mjsax I've rebased the branch. Do you mean comment by Guozhang in the voting thread? If not, I missed it and can't find anything new in the DISCUSS thread. Can you point me where was it asked? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9676) Add test coverage for new ActiveTaskCreator and StandbyTaskCreator
[ https://issues.apache.org/jira/browse/KAFKA-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9676. Resolution: Fixed The current unit test coverage is pretty good now, closing the ticket. > Add test coverage for new ActiveTaskCreator and StandbyTaskCreator > -- > > Key: KAFKA-9676 > URL: https://issues.apache.org/jira/browse/KAFKA-9676 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Boyang Chen >Priority: Major > Labels: help-wanted, newbie > > The newly separated ActiveTaskCreator and StandbyTaskCreator have no unit > test coverage. We should add corresponding tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9989) StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor gets assigned task
[ https://issues.apache.org/jira/browse/KAFKA-9989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106950#comment-17106950 ] Boyang Chen commented on KAFKA-9989: I didn't find that in the log [~ableegoldman] > StreamsUpgradeTest.test_metadata_upgrade could not guarantee all processor > gets assigned task > - > > Key: KAFKA-9989 > URL: https://issues.apache.org/jira/browse/KAFKA-9989 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > System test StreamsUpgradeTest.test_metadata_upgrade could fail due to: > "Never saw output 'processed [0-9]* records' on ubuntu@worker6" > which if we take a closer look at, the rebalance happens but has no task > assignment. We should fix this problem by making the rebalance result as part > of the check, and skip the record processing validation when the assignment > is empty. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8395: Added doc for KIP-535 and updated it for KIP-562
abbccdda commented on a change in pull request #8395: URL: https://github.com/apache/kafka/pull/8395#discussion_r404455662 ## File path: docs/upgrade.html ## @@ -39,7 +39,8 @@ Notable changes in 2 https://github.com/apache/kafka/tree/2.5/examples;>examples folder. Check out https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics;>KIP-447 for the full details. -Deprecated KafkaStreams.store(String, QueryableStoreType) and replaced it with KafkaStreams.store(StoreQueryParameters). +Provided support to query stale stores(for high availability) and the stores belonging to a specific partition by deprecating KafkaStreams.store(String, QueryableStoreType) and replacing it with KafkaStreams.store(StoreQueryParameters). Review comment: ![image](https://user-images.githubusercontent.com/5845561/78615480-809bb580-7826-11ea-9f59-2e7c3cf1a901.png) Let's add a space between `stores(` as `stores (`, and add a period after `KafkaStreams.allLocalStorePartitionLags()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org