[jira] [Created] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7
Stanislav Kozlovski created KAFKA-16141: --- Summary: StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7 Key: KAFKA-16141 URL: https://issues.apache.org/jira/browse/KAFKA-16141 Project: Kafka Issue Type: Test Affects Versions: 3.7.0 Reporter: Stanislav Kozlovski {code:java} kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from ubuntu@worker26") Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py", line 79, in test_standby_tasks_rebalance self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py", line 96, in wait_for_verification err_msg="Did expect to read '%s' from %s" % (message, processor.node.account)) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py", line 58, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from ubuntu@worker26 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16140) zookeeper_migration_test#TestMigration#test_recooncile_kraft_to_zk system test fails concistently on 3.7
Stanislav Kozlovski created KAFKA-16140: --- Summary: zookeeper_migration_test#TestMigration#test_recooncile_kraft_to_zk system test fails concistently on 3.7 Key: KAFKA-16140 URL: https://issues.apache.org/jira/browse/KAFKA-16140 Project: Kafka Issue Type: Test Affects Versions: 3.7.0 Reporter: Stanislav Kozlovski {code:java} kafkatest.tests.core. zookeeper_migration_test.TestMigration#test_reconcile_kraft_to_zk AssertionError('Did not see expected INFO log after migration') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/zookeeper_migration_test.py", line 367, in test_reconcile_kraft_to_zk assert saw_expected_log, "Did not see expected INFO log after migration" AssertionError: Did not see expected INFO log after migration{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0
Stanislav Kozlovski created KAFKA-16139: --- Summary: StreamsUpgradeTest fails consistently in 3.7.0 Key: KAFKA-16139 URL: https://issues.apache.org/jira/browse/KAFKA-16139 Project: Kafka Issue Type: Test Affects Versions: 3.7.0 Reporter: Stanislav Kozlovski h1. kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{ “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”} {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on ubuntu@worker2')}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16138) QuotaTest system test fails consistently in 3.7
Stanislav Kozlovski created KAFKA-16138: --- Summary: QuotaTest system test fails consistently in 3.7 Key: KAFKA-16138 URL: https://issues.apache.org/jira/browse/KAFKA-16138 Project: Kafka Issue Type: Test Affects Versions: 3.7.0 Reporter: Stanislav Kozlovski as mentioned in [https://hackmd.io/@hOneAGCrSmKSpL8VF-1HWQ/HyRgRJmta#kafkatesttestsclientquota_testQuotaTesttest_quotaArguments-%E2%80%9Coverride_quota%E2%80%9D-true-%E2%80%9Cquota_type%E2%80%9D-%E2%80%9Cuser%E2%80%9D,] the test fails consistently: {code:java} ValueError('max() arg is an empty sequence') Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 184, in _do_run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", line 262, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", line 169, in test_quota success, msg = self.validate(self.kafka, producer, consumer) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/quota_test.py", line 197, in validate metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics ValueError: max() arg is an empty sequence {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16136) CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() is very flaky
Stanislav Kozlovski created KAFKA-16136: --- Summary: CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() is very flaky Key: KAFKA-16136 URL: https://issues.apache.org/jira/browse/KAFKA-16136 Project: Kafka Issue Type: Test Reporter: Stanislav Kozlovski The test failed 3 builds in a row (with different JDK versions) in the 3.7 release branch as part of verifying the release Locally it passed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16135) kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
Stanislav Kozlovski created KAFKA-16135: --- Summary: kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky Key: KAFKA-16135 URL: https://issues.apache.org/jira/browse/KAFKA-16135 Project: Kafka Issue Type: Test Reporter: Stanislav Kozlovski The test kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is incredibly flaky - it failed 3 builds in a row for the 3.7 release candidate, but with different JDK versions. Locally it also fails often and requires a few retries to pass -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
Stanislav Kozlovski created KAFKA-16134: --- Summary: kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky Key: KAFKA-16134 URL: https://issues.apache.org/jira/browse/KAFKA-16134 Project: Kafka Issue Type: Test Reporter: Stanislav Kozlovski The following test is very flaky. It failed 3 times consecutively in Jenkins runs for the 3.7 release candidate. kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-16046. - Resolution: Fixed > Stream Stream Joins fail after restoration with deserialization exceptions > -- > > Key: KAFKA-16046 > URL: https://issues.apache.org/jira/browse/KAFKA-16046 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Blocker > Labels: streams > Fix For: 3.7.0 > > > Before KIP-954, the `KStreamImplJoin` class would always create > non-timestamped persistent windowed stores. After that KIP, the default was > changed to create timestamped stores. This wasn't compatible because, during > restoration, timestamped stores have their changelog values transformed to > prepend the timestamp to the value. This caused serialization errors when > trying to read from the store because the deserializers did not expect the > timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-12679. - Resolution: Fixed Marking this as done as per Lucas' comment that this is solved > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.7.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15147) Measure pending and outstanding Remote Segment operations
[ https://issues.apache.org/jira/browse/KAFKA-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-15147. - Resolution: Fixed > Measure pending and outstanding Remote Segment operations > - > > Key: KAFKA-15147 > URL: https://issues.apache.org/jira/browse/KAFKA-15147 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-963%3A+Upload+and+delete+lag+metrics+in+Tiered+Storage > > KAFKA-15833: RemoteCopyLagBytes > KAFKA-16002: RemoteCopyLagSegments, RemoteDeleteLagBytes, > RemoteDeleteLagSegments > KAFKA-16013: ExpiresPerSec > KAFKA-16014: RemoteLogSizeComputationTime, RemoteLogSizeBytes, > RemoteLogMetadataCount > KAFKA-15158: RemoteDeleteRequestsPerSec, RemoteDeleteErrorsPerSec, > BuildRemoteLogAuxStateRequestsPerSec, BuildRemoteLogAuxStateErrorsPerSec > > Remote Log Segment operations (copy/delete) are executed by the Remote > Storage Manager, and recorded by Remote Log Metadata Manager (e.g. default > TopicBasedRLMM writes to the internal Kafka topic state changes on remote log > segments). > As executions run, fail, and retry; it will be important to know how many > operations are pending and outstanding over time to alert operators. > Pending operations are not enough to alert, as values can oscillate closer to > zero. An additional condition needs to apply (running time > threshold) to > consider an operation outstanding. > Proposal: > RemoteLogManager could be extended with 2 concurrent maps > (pendingSegmentCopies, pendingSegmentDeletes) `Map[Uuid, Long]` to measure > segmentId time when operation started, and based on this expose 2 metrics per > operation: > * pendingSegmentCopies: gauge of pendingSegmentCopies map > * outstandingSegmentCopies: loop over pending ops, and if now - startedTime > > timeout, then outstanding++ (maybe on debug level?) > Is this a valuable metric to add to Tiered Storage? or better to solve on a > custom RLMM implementation? > Also, does it require a KIP? > Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15327) Client consumer should commit offsets on close
[ https://issues.apache.org/jira/browse/KAFKA-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-15327. - Resolution: Fixed Resolving since this was merged > Client consumer should commit offsets on close > -- > > Key: KAFKA-15327 > URL: https://issues.apache.org/jira/browse/KAFKA-15327 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-preview > Fix For: 3.7.0 > > > In the current implementation of the KafkaConsumer, the ConsumerCoordinator > commits offsets before the consumer is closed, with a call to > maybeAutoCommitOffsetsSync(timer); > The async consumer should provide the same behaviour to commit offsets on > close. > This fix should allow to successfully run the following integration tests > (defined in PlaintextConsumerTest) > * testAutoCommitOnClose > * testAutoCommitOnCloseAfterWakeup -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15780) Wait for consistent kraft metadata when creating topics in tests
[ https://issues.apache.org/jira/browse/KAFKA-15780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-15780. - Resolution: Fixed Resolving since this was merged. Nice work! > Wait for consistent kraft metadata when creating topics in tests > > > Key: KAFKA-15780 > URL: https://issues.apache.org/jira/browse/KAFKA-15780 > Project: Kafka > Issue Type: Test >Reporter: David Mao >Assignee: David Mao >Priority: Minor > Fix For: 3.7.0 > > > Tests occasionally flake when not retrying stale metadata in KRaft mode. > I suspect that the root cause is because TestUtils.createTopicWithAdmin waits > for partitions to be present in the metadata cache but does not wait for the > metadata to be fully published to the broker. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available
[ https://issues.apache.org/jira/browse/KAFKA-15817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-15817. - Resolution: Fixed Resolving since this was merged. good job! > Avoid reconnecting to the same IP address if multiple addresses are available > - > > Key: KAFKA-15817 > URL: https://issues.apache.org/jira/browse/KAFKA-15817 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1 >Reporter: Bob Barrett >Assignee: Bob Barrett >Priority: Major > Fix For: 3.7.0 > > > In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS > resolution behavior for clients to re-resolve DNS after disconnecting from a > broker, rather than wait until we iterated over all addresses from a given > resolution. This is useful when the IP addresses have changed between the > connection and disconnection. > However, with the behavior change, this does mean that clients could > potentially reconnect immediately to the same IP they just disconnected from, > if the IPs have not changed. In cases where the disconnection happened > because that IP was unhealthy (such as a case where a load balancer has > instances in multiple availability zones and one zone is unhealthy, or a case > where an intermediate component in the network path is going through a > rolling restart), this will delay the client successfully reconnecting. To > address this, clients should remember the IP they just disconnected from and > skip that IP when reconnecting, as long as the address resolved to multiple > addresses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16007) ZK migrations can be slow for large clusters
[ https://issues.apache.org/jira/browse/KAFKA-16007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-16007. - Resolution: Fixed Closing since it's merged > ZK migrations can be slow for large clusters > > > Key: KAFKA-16007 > URL: https://issues.apache.org/jira/browse/KAFKA-16007 > Project: Kafka > Issue Type: Improvement > Components: controller, kraft >Reporter: David Arthur >Assignee: David Arthur >Priority: Minor > Fix For: 3.7.0, 3.6.2 > > > On a large cluster with many single-partition topics, the ZK to KRaft > migration took nearly half an hour: > {code} > [KRaftMigrationDriver id=9990] Completed migration of metadata from ZooKeeper > to KRaft. 157396 records were generated in 2245862 ms across 67132 batches. > The record types were {TOPIC_RECORD=66282, PARTITION_RECORD=72067, > CONFIG_RECORD=17116, PRODUCER_IDS_RECORD=1, > ACCESS_CONTROL_ENTRY_RECORD=1930}. The current metadata offset is now 332267 > with an epoch of 19. Saw 36 brokers in the migrated metadata [0, 1, 2, 3, 4, > 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, > 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35]. > {code} > This is a result of how we generate batches of records when traversing the ZK > tree. Since we now using metadata transactions for the migration, we can > re-batch these without any consistency problems. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15818) Implement max poll interval
[ https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-15818. - Resolution: Fixed > Implement max poll interval > --- > > Key: KAFKA-15818 > URL: https://issues.apache.org/jira/browse/KAFKA-15818 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-e2e, kip-848-preview > Fix For: 3.7.0 > > > The consumer needs to be polled at a candance lower than > MAX_POLL_INTERVAL_MAX otherwise the consumer should try to leave the group. > Currently, we send an acknowledgment event to the network thread per poll. > The event only triggers update on autocommit state, we need to implement > updating the poll timer so that the consumer can leave the group when the > timer expires. > > The current logic looks like this: > {code:java} > if (heartbeat.pollTimeoutExpired(now)) { > // the poll timeout has expired, which means that the foreground thread > has stalled > // in between calls to poll(). > log.warn("consumer poll timeout has expired. This means the time between > subsequent calls to poll() " + > "was longer than the configured max.poll.interval.ms, which typically > implies that " + > "the poll loop is spending too much time processing messages. You can > address this " + > "either by increasing max.poll.interval.ms or by reducing the maximum > size of batches " + > "returned in poll() with max.poll.records."); > maybeLeaveGroup("consumer poll timeout has expired."); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16026) AsyncConsumer does not send a poll event to the background thread
[ https://issues.apache.org/jira/browse/KAFKA-16026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-16026. - Resolution: Fixed https://github.com/apache/kafka/pull/15035 > AsyncConsumer does not send a poll event to the background thread > - > > Key: KAFKA-16026 > URL: https://issues.apache.org/jira/browse/KAFKA-16026 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > consumer poll does not send a poll event to the background thread to: > # trigger autocommit > # reset max poll interval timer > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14175) KRaft Upgrades Part 2
[ https://issues.apache.org/jira/browse/KAFKA-14175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-14175. - Resolution: Won't Fix > KRaft Upgrades Part 2 > - > > Key: KAFKA-14175 > URL: https://issues.apache.org/jira/browse/KAFKA-14175 > Project: Kafka > Issue Type: New Feature >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > > This is the parent issue for KIP-778 tasks which were not completed for the > 3.3 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14121) AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor
Stanislav Kozlovski created KAFKA-14121: --- Summary: AlterPartitionReassignments API should allow callers to specify the option of preserving the replication factor Key: KAFKA-14121 URL: https://issues.apache.org/jira/browse/KAFKA-14121 Project: Kafka Issue Type: New Feature Reporter: Stanislav Kozlovski Using Kafka's public APIs to get metadata regarding the non-reassigning replicas for a topic is unreliable and prone to race conditions. If a person or a system is to rely on the provided metadata, it can end up {color:#202124}unintentionally {color}increasing the replication factor for a partition. It would be useful to have some sort of guardrail against this happening {color:#202124}inadvertently.{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config
[ https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-8406. Fix Version/s: 2.4.0 Resolution: Fixed > kafka-topics throws wrong error on invalid configuration with > bootstrap-server and alter config > --- > > Key: KAFKA-8406 > URL: https://issues.apache.org/jira/browse/KAFKA-8406 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > Fix For: 2.4.0 > > > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic{code} > Results in > {code:java} > Missing required argument "[partitions]"{code} > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic --partitions 25{code} > Results in > {code:java} > Option combination "[bootstrap-server],[config]" can't be used with option > "[alter]"{code} > For better clarity, we should just throw the last error outright. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12555) Log reason for rolling a segment
Stanislav Kozlovski created KAFKA-12555: --- Summary: Log reason for rolling a segment Key: KAFKA-12555 URL: https://issues.apache.org/jira/browse/KAFKA-12555 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski It would be useful for issue-diagnostic purposes to log the reason for why a log segment was rolled (https://github.com/apache/kafka/blob/e840b03a026ddb9a67a15a164d877545130d6e17/core/src/main/scala/kafka/log/Log.scala#L2069) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10510) Reassigning partitions should not allow increasing RF of a partition unless configured with it
Stanislav Kozlovski created KAFKA-10510: --- Summary: Reassigning partitions should not allow increasing RF of a partition unless configured with it Key: KAFKA-10510 URL: https://issues.apache.org/jira/browse/KAFKA-10510 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Kafka should have some validations in place against increasing the RF of a partition through a reassignment. Users could otherwise shoot themselves in the foot by increasing the RF of a topic by reassigning its partitions to extra replicas and then have new partition creations use a lesser (the configured) replication factor. Our tools should ideally detect when RF is increasing inconsistently with the config and issue a separate command to change the config. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10353) Trogdor - Fix RoundTripWorker to not fail when the topic it's trying to create already exists
[ https://issues.apache.org/jira/browse/KAFKA-10353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-10353. - Resolution: Duplicate > Trogdor - Fix RoundTripWorker to not fail when the topic it's trying to > create already exists > - > > Key: KAFKA-10353 > URL: https://issues.apache.org/jira/browse/KAFKA-10353 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Priority: Major > > Trogdor's RoundTripWorker calls WorkerUtils#createTopics with a failOnCreate > flag equal to true, making the code throw an exception if the topic already > exists. > [https://github.com/apache/kafka/blob/28b7d8e21656649fb09b09f9bacfe865b0ca133c/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java#L149] > This is prone to race conditions when scheduling multiple workers to start at > the same time - only one will succeed in creating a topic and running the > test, while the rest will end up with a fatal error > This has also been seen to happen in the RoundTripFaultTest system test where > a network exception can cause the CreateTopics request to reach Kafka but > Trogdor retry it and hit a TopicAlreadyExists exception on the retry, failing > the test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10353) Trogdor - Fix RoundTripWorker to not fail when the topic it's trying to create already exists
Stanislav Kozlovski created KAFKA-10353: --- Summary: Trogdor - Fix RoundTripWorker to not fail when the topic it's trying to create already exists Key: KAFKA-10353 URL: https://issues.apache.org/jira/browse/KAFKA-10353 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski Trogdor's RoundTripWorker calls WorkerUtils#createTopics with a failOnCreate flag equal to true, making the code throw an exception if the topic already exists. [https://github.com/apache/kafka/blob/28b7d8e21656649fb09b09f9bacfe865b0ca133c/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java#L149] This is prone to race conditions when scheduling multiple workers to start at the same time - only one will succeed in creating a topic and running the test, while the rest will end up with a fatal error This has also been seen to happen in the RoundTripFaultTest system test where a network exception can cause the CreateTopics request to reach Kafka but Trogdor retry it and hit a TopicAlreadyExists exception on the retry, failing the test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions
[ https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-10301. - Resolution: Fixed > Partition#remoteReplicasMap can be empty in certain race conditions > --- > > Key: KAFKA-10301 > URL: https://issues.apache.org/jira/browse/KAFKA-10301 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Blocker > > In Partition#updateAssignmentAndIsr, we would previously update the > `partition#remoteReplicasMap` by adding the new replicas to the map and then > removing the old ones > ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] > During a recent refactoring, we changed it to first clear the map and then > add all the replicas to it > ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) > While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not > all callers that access the map structure use a lock. Some examples: > - Partition#updateFollowerFetchState > - DelayedDeleteRecords#tryComplete > - Partition#getReplicaOrException - called in > `checkEnoughReplicasReachOffset` without a lock, which itself is called by > DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. > While we want to polish the code to ensure these sort of race conditions > become harder (or impossible) to introduce, it sounds safest to revert to the > previous behavior given the timelines regarding the 2.6 release. Jira > https://issues.apache.org/jira/browse/KAFKA-10302 tracks further > modifications to the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10302) Ensure thread-safe access to Partition#remoteReplicasMap
Stanislav Kozlovski created KAFKA-10302: --- Summary: Ensure thread-safe access to Partition#remoteReplicasMap Key: KAFKA-10302 URL: https://issues.apache.org/jira/browse/KAFKA-10302 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski A recent Jira (https://issues.apache.org/jira/browse/KAFKA-10301) exposed how easy it is to introduce nasty race conditions with the Partition#remoteReplicasMap data structure. It is a concurrent map which is modified inside a write lock but it is not always accessed through that lock. Therefore it's possible for callers to access an intermediate state of the map, for instance in between updating the replica assignment for a given partition. It would be good to ensure thread-safe access to the data structure in a way which makes it harder to introduce such regressions in the future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10301) RemoteReplicasMap can be empty in certain race conditions
Stanislav Kozlovski created KAFKA-10301: --- Summary: RemoteReplicasMap can be empty in certain race conditions Key: KAFKA-10301 URL: https://issues.apache.org/jira/browse/KAFKA-10301 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira X tracks that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8723) flaky test LeaderElectionCommandTest#testAllTopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-8723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-8723. Resolution: Fixed > flaky test LeaderElectionCommandTest#testAllTopicPartition > -- > > Key: KAFKA-8723 > URL: https://issues.apache.org/jira/browse/KAFKA-8723 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Stanislav Kozlovski >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23737/console] > > *15:52:26* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition > STARTED*15:53:08* kafka.admin.LeaderElectionCommandTest.testAllTopicPartition > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11@2/core/build/reports/testOutput/kafka.admin.LeaderElectionCommandTest.testAllTopicPartition.test.stdout*15:53:08* > *15:53:08* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition > FAILED*15:53:08* kafka.common.AdminCommandFailedException: Timeout > waiting for election results*15:53:08* at > kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:133)*15:53:08* > at > kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:88)*15:53:08* > at > kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:41)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:91)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:74)*15:53:08* > at kafka.utils.TestUtils$.resource(TestUtils.scala:1588)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest.testAllTopicPartition(LeaderElectionCommandTest.scala:74)*15:53:08* > *15:53:08* Caused by:*15:53:08* > org.apache.kafka.common.errors.TimeoutException: Aborted due to > timeout.*15:53:08* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9866) Do not attempt to elect preferred leader replicas which are outside ISR
Stanislav Kozlovski created KAFKA-9866: -- Summary: Do not attempt to elect preferred leader replicas which are outside ISR Key: KAFKA-9866 URL: https://issues.apache.org/jira/browse/KAFKA-9866 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski The controller automatically triggers a preferred leader election every N minutes. It tries to elect all preferred leaders of partitions without doing some basic checks like whether the leader is in sync. This leads to a multitude of errors which cause confusion: {code:java} April 14th 2020, 17:01:11.015 [Controller id=0] Partition TOPIC-9 failed to complete preferred replica leader election to 1. Leader is still 0{code} {code:java} April 14th 2020, 17:01:11.002 [Controller id=0] Error completing replica leader election (PREFERRED) for partition TOPIC-9 kafka.common.StateChangeFailedException: Failed to elect leader for partition TOPIC-9 under strategy PreferredReplicaPartitionLeaderElectionStrategy {code} It would be better if the Controller filtered out some of these elections, not attempt them at all and maybe log an aggregate INFO-level log -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed
Stanislav Kozlovski created KAFKA-9617: -- Summary: Replica Fetcher can mark partition as failed when max.message.bytes is changed Key: KAFKA-9617 URL: https://issues.apache.org/jira/browse/KAFKA-9617 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski There exists a race condition when changing the dynamic max.message.bytes config for a topic. A follower replica can replicate a message that is over that size after it processes the config change. When this happens, the replica fetcher catches the unexpected exception, marks the partition as failed and stops replicating it. {code:java} 06:38:46.596Processing override for entityPath: topics/partition-1 with config: Map(max.message.bytes -> 512) 06:38:46.597 [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Unexpected error occurred while processing data for partition partition-1 at offset 20964 org.apache.kafka.common.errors.RecordTooLargeException: The record batch size in the append to partition-1 is 3349 bytes which exceeds the maximum configured value of 512. {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9589) LogValidatorTest#testLogAppendTimeNonCompressedV2 is not executed and does not pass
Stanislav Kozlovski created KAFKA-9589: -- Summary: LogValidatorTest#testLogAppendTimeNonCompressedV2 is not executed and does not pass Key: KAFKA-9589 URL: https://issues.apache.org/jira/browse/KAFKA-9589 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski The LogValidatorTest#testLogAppendTimeNonCompressedV2 test does not execute because it's missing a '@Test' annotation. When executed locally, it fails with the following error: {code:java} java.lang.AssertionError: The offset of max timestamp should be 0 Expected :0 Actual :2 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9094) Validate the replicas for partition reassignments triggered through the /admin/reassign_partitions zNode
Stanislav Kozlovski created KAFKA-9094: -- Summary: Validate the replicas for partition reassignments triggered through the /admin/reassign_partitions zNode Key: KAFKA-9094 URL: https://issues.apache.org/jira/browse/KAFKA-9094 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski As was mentioned by [~jsancio] in [https://github.com/apache/kafka/pull/7574#discussion_r337621762] , it would make sense to apply the same replica validation we apply to the KIP-455 reassignments API. Namely, validate that the replicas: * are not empty (e.g []) * are not negative negative (e.g [1,2,-1]) * are not brokers that are not part of the cluster or otherwise unhealthy (e.g not in /brokers zNode) The last liveness validation is subject to comments. We are re-evaluating whether we want to enforce it for the API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9023) Producer NETWORK_EXCEPTION response should log more information
Stanislav Kozlovski created KAFKA-9023: -- Summary: Producer NETWORK_EXCEPTION response should log more information Key: KAFKA-9023 URL: https://issues.apache.org/jira/browse/KAFKA-9023 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski When diagnosing network issues, it is useful to have a clear picture of which client disconnected from which broker at what time. Currently, when the producer receives a NETWORK_EXCEPTION in its responses, it logs the following: {code:java} [Producer clientId=] Received invalid metadata error in produce request on partition due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now {code} It would be good if we logged additional information regarding the broker/host whose connection was disconnected -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8967) Flaky test kafka.api.SaslSslAdminClientIntegrationTest.testCreateTopicsResponseMetadataAndConfig
Stanislav Kozlovski created KAFKA-8967: -- Summary: Flaky test kafka.api.SaslSslAdminClientIntegrationTest.testCreateTopicsResponseMetadataAndConfig Key: KAFKA-8967 URL: https://issues.apache.org/jira/browse/KAFKA-8967 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski {code:java} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.api.SaslSslAdminClientIntegrationTest.testCreateTopicsResponseMetadataAndConfig(SaslSslAdminClientIntegrationTest.scala:452) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.{code} Failed in [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/25374] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8923) Revisit OnlineReplica state change in reassignments
Stanislav Kozlovski created KAFKA-8923: -- Summary: Revisit OnlineReplica state change in reassignments Key: KAFKA-8923 URL: https://issues.apache.org/jira/browse/KAFKA-8923 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski In the replica state machine, when switching a partition to an OnlineReplica, we conditionally send a LeaderAndIsr request when the partition is available in the `partitionLeadershipInfo` structure. This happens when we switch states during a partition reassignment. It does not happen when a partition is created for the first time, as it is not present in `partitionLeadershipInfo` at that time This is a bit weird, because an OnlineReplica implies that the replica is alive, not necessarily in the ISR. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8894) Flaky org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic
Stanislav Kozlovski created KAFKA-8894: -- Summary: Flaky org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic Key: KAFKA-8894 URL: https://issues.apache.org/jira/browse/KAFKA-8894 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski {code:java} java.lang.AssertionError: Condition not met within timeout 3. Topics are not expected after 3 milli seconds. at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.waitForRemainingTopics(EmbeddedKafkaCluster.java:298) at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.assertInternalTopicsGotDeleted(AbstractResetIntegrationTest.java:589) at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic(AbstractResetIntegrationTest.java:399) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromFileAfterResetWithoutIntermediateUserTopic(ResetIntegrationTest.java:82) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) {code} [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/24733/testReport/junit/org.apache.kafka.streams.integration/ResetIntegrationTest/testReprocessingFromFileAfterResetWithoutIntermediateUserTopic/] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8895) Flaky org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic
Stanislav Kozlovski created KAFKA-8895: -- Summary: Flaky org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic Key: KAFKA-8895 URL: https://issues.apache.org/jira/browse/KAFKA-8895 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski {code:java} java.lang.AssertionError: Condition not met within timeout 3. Topics are not expected after 3 milli seconds. at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353) at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.waitForRemainingTopics(EmbeddedKafkaCluster.java:298) at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.assertInternalTopicsGotDeleted(AbstractResetIntegrationTest.java:589) {code} [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/24733/testReport/junit/org.apache.kafka.streams.integration/ResetIntegrationTest/testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic/] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8893) Flaky ResetIntegrationTest. testReprocessingFromScratchAfterResetWithIntermediateUserTopic
Stanislav Kozlovski created KAFKA-8893: -- Summary: Flaky ResetIntegrationTest. testReprocessingFromScratchAfterResetWithIntermediateUserTopic Key: KAFKA-8893 URL: https://issues.apache.org/jira/browse/KAFKA-8893 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski {code:java} java.lang.AssertionError: Condition not met within timeout 6. Did not receive all 10 records from topic outputTopic at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:506) {code} [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/24733/testReport/junit/org.apache.kafka.streams.integration/ResetIntegrationTest/testReprocessingFromScratchAfterResetWithIntermediateUserTopic/] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Resolved] (KAFKA-6304) The controller should allow updating the partition reassignment for the partitions being reassigned
[ https://issues.apache.org/jira/browse/KAFKA-6304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-6304. Resolution: Duplicate !https://issues.apache.org/jira/secure/viewavatar?size=xsmall=21140=issuetype! KAFKA-8345 and [KIP-455|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment]] will fix this issue > The controller should allow updating the partition reassignment for the > partitions being reassigned > --- > > Key: KAFKA-6304 > URL: https://issues.apache.org/jira/browse/KAFKA-6304 > Project: Kafka > Issue Type: Improvement > Components: controller, core >Affects Versions: 1.0.0 >Reporter: Jiangjie Qin >Priority: Major > > Currently the controller will not process the partition reassignment of a > partition if the partition is already being reassigned. > The issue is that if there is a broker failure during the partition > reassignment, the partition reassignment may never finish. And the users may > want to cancel the partition reassignment. However, the controller will > refuse to do that unless user manually deletes the partition reassignment zk > path, force a controller switch and then issue the revert command. This is > pretty involved. It seems reasonable for the controller to replace the > ongoing stuck reassignment and replace it with the updated partition > assignment. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8873) Implement timeout for Alter/List PartitionReassignment APIs
Stanislav Kozlovski created KAFKA-8873: -- Summary: Implement timeout for Alter/List PartitionReassignment APIs Key: KAFKA-8873 URL: https://issues.apache.org/jira/browse/KAFKA-8873 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski In the initial implementation of KIP-455, we decided ([https://github.com/apache/kafka/pull/7128#issuecomment-528099402)] to delegate the implementation of the timeout functionality to a separate task. This is in part because the change is not trivial and because there are other controller RPCs which would be good to get updated with timeout functionality -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8779) Fix flaky tests introduced by dynamic log levels
Stanislav Kozlovski created KAFKA-8779: -- Summary: Fix flaky tests introduced by dynamic log levels Key: KAFKA-8779 URL: https://issues.apache.org/jira/browse/KAFKA-8779 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski https://issues.apache.org/jira/browse/KAFKA-7800 introduced a bunch of flaky tests in the AdminClientIntegrationTest. These tests are failing very frequently Significant work was done in the initial PR to ensure the tests aren't flaky but it apparently was not sufficient. We should ignore the tests in order to get green builds before we fix them -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8778) Trogdor - do not expose the full spec JSON in list endpoint (/coordinator/tasks)
Stanislav Kozlovski created KAFKA-8778: -- Summary: Trogdor - do not expose the full spec JSON in list endpoint (/coordinator/tasks) Key: KAFKA-8778 URL: https://issues.apache.org/jira/browse/KAFKA-8778 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski As mentioned in KAFKA-8777, the list tasks Trogdor endpoint `/coordinator/tasks` can get very big. It would be useful if we didn't expose the full spec definition in that endpoint but rather have every spec define a simpler JSON which is shown through that list endpoint. If users want the full JSON, they can use the detail endpoint `/coordinator/tasks/` -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8777) Trogdor - Add GET /coordinator/tasks/ids endpoint
Stanislav Kozlovski created KAFKA-8777: -- Summary: Trogdor - Add GET /coordinator/tasks/ids endpoint Key: KAFKA-8777 URL: https://issues.apache.org/jira/browse/KAFKA-8777 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski When scheduling a non-trivial Trogdor workoad (e.g 15 agents and 150 tasks), it is difficult to diagnose all the tasks that are running. The output of `GET /coordinator/tasks` can get very large - 28.6MB in my case. This is mainly due to running a lot of ProduceBenchSpec tasks which are subscribed to a lot of topics. I think we should have a simpler endpoint which exposes all the task IDs that are running. This can let us get the IDs to query specific tasks via `GET /coordinator/tasks?taskId=,` ot `GET /coordinator/tasks/` -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8751) Trogdor - Add endpoint that deletes all tasks
Stanislav Kozlovski created KAFKA-8751: -- Summary: Trogdor - Add endpoint that deletes all tasks Key: KAFKA-8751 URL: https://issues.apache.org/jira/browse/KAFKA-8751 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski It is very useful to have a single endpoint which clears all of Trogdor's state. During testing, we sometimes want to start from a clean slate and ensure all tasks are finished. The easiest way to do this is to delete all of them, ensuring all have stop. Currently, we need to resort to calling the API N times, where N is the number of (running) tasks present. It would be way cooler if we had a single endpoint to do all of this for us, something like: `DELETE /coordinator/tasks/` -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8725) Improve LogCleaner error handling when failing to grab the filthiest log
Stanislav Kozlovski created KAFKA-8725: -- Summary: Improve LogCleaner error handling when failing to grab the filthiest log Key: KAFKA-8725 URL: https://issues.apache.org/jira/browse/KAFKA-8725 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski https://issues.apache.org/jira/browse/KAFKA-7215 improved error handling in the log cleaner with the goal of not having the whole thread die when an exception happens, but rather mark the partition that caused it as uncleanable and continue cleaning the error-free partitions. Unfortunately, the current code can still bubble up an exception and cause the thread to die when an error happens before we can grab the filthiest log and start cleaning it. At that point, we don't have a clear reference to the log that caused the exception and chose to throw an IllegalStateException - [https://github.com/apache/kafka/blob/39bcc8447c906506d63b8df156cf90174bbb8b78/core/src/main/scala/kafka/log/LogCleaner.scala#L346] (as seen in https://issues.apache.org/jira/browse/KAFKA-8724) Essentially, exceptions in `grabFilthiestCompactedLog` still cause the thread to die. This can be further improved by trying to catch what log caused the exception in the aforementioned function -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8183) Trogdor - ProduceBench should retry on UnknownTopicOrPartitionException during topic creation
[ https://issues.apache.org/jira/browse/KAFKA-8183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-8183. Resolution: Fixed > Trogdor - ProduceBench should retry on UnknownTopicOrPartitionException > during topic creation > - > > Key: KAFKA-8183 > URL: https://issues.apache.org/jira/browse/KAFKA-8183 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > There exists a race condition in the Trogdor produce bench worker code where > `WorkerUtils#createTopics()` [notices the topic > exists|https://github.com/apache/kafka/blob/4824dc994d7fc56b7540b643a78aadb4bdd0f14d/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java#L159] > yet when it goes on to verify the topics, the DescribeTopics call throws an > `UnknownTopicOrPartitionException`. > We should add sufficient retries such that this does not fail the task. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8632) Retry replica deletion if it fails during partition reassignment
Stanislav Kozlovski created KAFKA-8632: -- Summary: Retry replica deletion if it fails during partition reassignment Key: KAFKA-8632 URL: https://issues.apache.org/jira/browse/KAFKA-8632 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski An old TODO comment from 2014 ([https://github.com/apache/kafka/blob/57903be49665566260160a6f1f995409ec48c9bc/core/src/main/scala/kafka/controller/KafkaController.scala#L807)] suggests that we support retries in the cases where an old replica's deletion fails after a partition reassignment succeeds. For more context, the comment was added as part of KAFKA-330 (https://issues.apache.org/jira/browse/KAFKA-330) and was reviewed in Apache's reviewboard - [https://reviews.apache.org/r/17460|https://reviews.apache.org/r/17460/#issue-summary] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8607) Reduce AdminClient Metadata request rate when invalid node id is given
Stanislav Kozlovski created KAFKA-8607: -- Summary: Reduce AdminClient Metadata request rate when invalid node id is given Key: KAFKA-8607 URL: https://issues.apache.org/jira/browse/KAFKA-8607 Project: Kafka Issue Type: Task Affects Versions: 2.3.0, 2.2.0, 2.1.0, 2.0.0 Reporter: Stanislav Kozlovski While testing KAFKA-7800 (KIP-412), we were playing around with the config command CLI and [noticed that it hangs for very long|https://github.com/apache/kafka/pull/6903#discussion_r297434016] when given an invalid broker id. After investigating a bit more, I noticed that we endlessly retry metadata updates. Locally, my AdminClient issued 78 requests for 10 seconds - averaging at a rate of 7.8 requests/sec. The call times out after 2 minutes by default - we end up sending 1149 requests. This respects the "retry.backoff.ms" config but it may be better to have some sort of exponential backoff to ease the needless load on the cluster. It is unlikely for this to be a high-impact change but it sounds worth it to have the protection. Orchestration systems like Kubernetes make it easier for a user to mass-deploy a wrong config and inadvertenly DDoS his cluster via metadata requests -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8528) Trogdor - Expose Trogdor-specific JMX metrics
Stanislav Kozlovski created KAFKA-8528: -- Summary: Trogdor - Expose Trogdor-specific JMX metrics Key: KAFKA-8528 URL: https://issues.apache.org/jira/browse/KAFKA-8528 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski When running a long-lived Trogdor cluster, we care about various information regarding the health of the cluster. Some of which include - number of active agents, number of RUNNING tasks, number of PENDING tasks and potentially others. Currently, in order to get some of this basic information, the user needs to query the REST API. This is manual and tedious work We should implement some of these metrics in the Trogdor Coordinator so users can monitor them in a nicely-setup dashboard. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7597) Trogdor - Support transactions in ProduceBenchWorker
[ https://issues.apache.org/jira/browse/KAFKA-7597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-7597. Resolution: Fixed > Trogdor - Support transactions in ProduceBenchWorker > > > Key: KAFKA-7597 > URL: https://issues.apache.org/jira/browse/KAFKA-7597 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > The `ProduceBenchWorker` allows you to schedule a benchmark of a Kafka > Producer. > It would prove useful if we supported transactions in this producer, as to > allow benchmarks with transactions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8470) State change logs should not be in TRACE level
Stanislav Kozlovski created KAFKA-8470: -- Summary: State change logs should not be in TRACE level Key: KAFKA-8470 URL: https://issues.apache.org/jira/browse/KAFKA-8470 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski The StateChange logger in Kafka should not be logging its state changes in TRACE level. We consider these changes very useful in debugging and we additionally configure that logger to log in TRACE levels by default. Since we consider it important enough to configure its own logger to log in a separate log level, why don't we change those logs to INFO and have the logger use the defaults? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8467) Trogdor - Add option to delete topic in RoundTripWorker
Stanislav Kozlovski created KAFKA-8467: -- Summary: Trogdor - Add option to delete topic in RoundTripWorker Key: KAFKA-8467 URL: https://issues.apache.org/jira/browse/KAFKA-8467 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Trogdor's RoundTripWorker verifies that each message that is produced to the cluster gets consumed. It automatically creates the topics we want to use but it does not delete them in the end. As topic deletion is an essential feature of Kafka, we should enable testing that too. Practice goes to show that we still have some bugs around that code path - e.g a recently uncovered memory leak https://issues.apache.org/jira/browse/KAFKA-8448 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config
Stanislav Kozlovski created KAFKA-8406: -- Summary: kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config Key: KAFKA-8406 URL: https://issues.apache.org/jira/browse/KAFKA-8406 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Running {code:java} ./kafka-topics --bootstrap-server --alter --config retention.ms=360 --topic topic{code} Results in {code:java} Missing required argument "[partitions]"{code} Running {code:java} ./kafka-topics --bootstrap-server --alter --config retention.ms=360 --topic topic --partitions 25{code} Results in {code:java} Option combination "[bootstrap-server],[config]" can't be used with option "[alter]"{code} For better clarity, we should just throw the last error outright. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8271) Trogdor - Log Jetty requests at DEBUG level
Stanislav Kozlovski created KAFKA-8271: -- Summary: Trogdor - Log Jetty requests at DEBUG level Key: KAFKA-8271 URL: https://issues.apache.org/jira/browse/KAFKA-8271 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski When running Trogdor, as with any system, is it important to keep track of the logs it produces. Trogdor agents can log useful information at the INFO level, depending on the worker being ran on them. The problem is that when you enable INFO level logs for the Agent, Jetty logs all the requests sent to the Agent at INFO. This results in a lot of useless logs like {code:java} [21/Apr/2019:11:46:52 +] "GET /agent/status HTTP/1.1" 200 1171 2{code} This can obscure the useful logs, especially when inspecting logs from a decent-sized Trogdor cluster. It would be great if we could set these logs at DEBUG level -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8183) Trogdor - ProduceBench should retry on UnknownTopicOrPartitionException during topic creation
Stanislav Kozlovski created KAFKA-8183: -- Summary: Trogdor - ProduceBench should retry on UnknownTopicOrPartitionException during topic creation Key: KAFKA-8183 URL: https://issues.apache.org/jira/browse/KAFKA-8183 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski There exists a race condition in the Trogdor produce bench worker code where `WorkerUtils#createTopics()` [notices the topic exists|https://github.com/apache/kafka/blob/4824dc994d7fc56b7540b643a78aadb4bdd0f14d/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java#L159] yet when it goes on to verify the topics, the DescribeTopics call throws an `UnknownTopicOrPartitionException`. We should add sufficient retries such that this does not fail the task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8176) Revisit BaseConsumerTest inheritance
Stanislav Kozlovski created KAFKA-8176: -- Summary: Revisit BaseConsumerTest inheritance Key: KAFKA-8176 URL: https://issues.apache.org/jira/browse/KAFKA-8176 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski As discussed here ([https://github.com/apache/kafka/pull/6238#discussion_r256542009),] we have redudant test cases being run, where `ConsumerBounceTest` inherits `BaseConsumerTest` We should revisit this, potentially via creating an intermediate class that is inherited or splitting the utility logic into a separate utillity class -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8102) Trogdor - Add Produce workload transaction generator by interval
Stanislav Kozlovski created KAFKA-8102: -- Summary: Trogdor - Add Produce workload transaction generator by interval Key: KAFKA-8102 URL: https://issues.apache.org/jira/browse/KAFKA-8102 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Trogdor's specification for produce worker workloads ([ProduceBenchSpec|[https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java]),] supports configuring a transactional producer using a class that implements `TransactionGenerator` interface. [UniformTransactioGenerator|[https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java],] which triggers a transaction every N records. It would be useful to have a generator which supports triggering a transaction in an interval - e.g every 100 milliseconds. This is how Kafka Streams configures its own [EOS semantics by default|https://github.com/apache/kafka/blob/8e975400711b0ea64bf4a00c8c551e448ab48416/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L140]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8036) Log dir reassignment on followers fails with FileNotFoundException for the leader epoch cache on leader election
[ https://issues.apache.org/jira/browse/KAFKA-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-8036. Resolution: Fixed I was wrong. After writing a test to verify the behavior ([https://github.com/stanislavkozlovski/kafka/commit/b93a866e2318c47ff7538ddecd2fa38f29abd188),] I found that KAFKA-7897 fixed this issue across all versions it affected. > Log dir reassignment on followers fails with FileNotFoundException for the > leader epoch cache on leader election > > > Key: KAFKA-8036 > URL: https://issues.apache.org/jira/browse/KAFKA-8036 > Project: Kafka > Issue Type: Improvement >Affects Versions: 1.1.1, 2.0.1 >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Major > > When changing a partition's log directories for a follower broker, we move > all the data related to that partition to the other log dir (as per > [KIP-113|https://cwiki.apache.org/confluence/display/KAFKA/KIP-113:+Support+replicas+movement+between+log+directories]). > On a successful move, we rename the original directory by adding a suffix > consisting of an UUID and `-delete`. (e.g `test_log_dir` would be renamed to > `test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete`) > We copy every log file and [initialize a new leader epoch file > cache|https://github.com/apache/kafka/blob/0d56f1413557adabc736cae2dffcdc56a620403e/core/src/main/scala/kafka/log/Log.scala#L768]. > The problem is that we do not update the associated `Replica` class' leader > epoch cache - it still points to the old `LeaderEpochFileCache` instance. > This results in a FileNotFound exception when the broker is [elected as a > leader for the > [partition|https://github.com/apache/kafka/blob/255f4a6effdc71c273691859cd26c4138acad778/core/src/main/scala/kafka/cluster/Partition.scala#L312]. > This has the unintended side effect of marking the log directory as offline, > resulting in all partitions from that log directory becoming unavailable for > the specific broker. > h2. > h2. Exception and logs > I reproduced this locally by running two brokers. The steps to reproduce: > {code:java} > Create partition replicated across two brokers (A, B) with leader A > Move partition leadership to B > Alter log dirs on A > Move partition leadership back to A{code} > This results in a log directory structure on broker B similar to this: > {code:java} > ├── new_dir > │ ├── cleaner-offset-checkpoint > │ ├── log-start-offset-checkpoint > │ ├── meta.properties > │ ├── recovery-point-offset-checkpoint > │ ├── replication-offset-checkpoint > │ └── test_log_dir-0 > │ ├── .index > │ ├── .log > │ ├── .timeindex > │ └── leader-epoch-checkpoint > └── old_dir > ├── cleaner-offset-checkpoint > ├── log-start-offset-checkpoint > ├── meta.properties > ├── recovery-point-offset-checkpoint > ├── replication-offset-checkpoint > └── test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete > ├── .index > ├── .log > ├── .timeindex > ├── 0009.snapshot > └── leader-epoch-checkpoint > {code} > > > {code:java} > [2019-03-04 15:36:56,854] INFO [Partition test_log_dir-0 broker=0] > test_log_dir-0 starts at Leader Epoch 3 from offset 9. Previous Leader Epoch > was: 2 (kafka.cluster.Partition) [2019-03-04 15:36:56,855] WARN > [LeaderEpochCache test_log_dir-0] New epoch entry EpochEntry(epoch=3, > startOffset=9) caused truncation of conflicting entries > ListBuffer(EpochEntry(epoch=1, startOffset=9)). Cache now contains 2 entries. > (kafka.server.epoch.LeaderEpochFileCache) [2019-03-04 15:36:56,857] ERROR > Error while writing to checkpoint file > /logs/old_dir/test_log_dir-0/leader-epoch-checkpoint > (kafka.server.LogDirFailureChannel) java.io.FileNotFoundException: > /logs/old_dir/test_log_dir-0/leader-epoch-checkpoint.tmp (No such file or > directory) at java.base/java.io.FileOutputStream.open0(Native Method) at > java.base/java.io.FileOutputStream.open(FileOutputStream.java:299) at > java.base/java.io.FileOutputStream.(FileOutputStream.java:238) at > java.base/java.io.FileOutputStream.(FileOutputStream.java:188) at > kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52) > at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) at > kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:64) > at > kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:219) > at >
[jira] [Created] (KAFKA-8036) Log dir reassignment on followers fails with FileNotFoundException for the leader epoch cache on leader election
Stanislav Kozlovski created KAFKA-8036: -- Summary: Log dir reassignment on followers fails with FileNotFoundException for the leader epoch cache on leader election Key: KAFKA-8036 URL: https://issues.apache.org/jira/browse/KAFKA-8036 Project: Kafka Issue Type: Improvement Affects Versions: 2.0.1, 1.1.0, 1.0.2 Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski When changing a partition's log directories for a follower broker, we move all the data related to that partition to the other log dir (as per [KIP-113|https://cwiki.apache.org/confluence/display/KAFKA/KIP-113:+Support+replicas+movement+between+log+directories]). On a successful move, we rename the original directory by adding a suffix consisting of an UUID and `-delete`. (e.g `test_log_dir` would be renamed to `test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete`) We copy every log file and [initialize a new leader epoch file cache|[https://github.com/apache/kafka/blob/0d56f1413557adabc736cae2dffcdc56a620403e/core/src/main/scala/kafka/log/Log.scala#L768].] The problem is that we do not update the associated `Replica` class' leader epoch cache - it still points to the old `LeaderEpochFileCache` instance. This results in a FileNotFound exception when the broker is [elected as a leader for the partition|[https://github.com/apache/kafka/blob/255f4a6effdc71c273691859cd26c4138acad778/core/src/main/scala/kafka/cluster/Partition.scala#L312].] This has the unintended side effect of marking the log directory as offline, resulting in all partitions from that log directory becoming unavailable for the specific broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8016) Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group
[ https://issues.apache.org/jira/browse/KAFKA-8016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-8016. Resolution: Duplicate This is a symptom of https://issues.apache.org/jira/browse/KAFKA-7831 > Race condition resulting in IllegalStateException inside Consumer Heartbeat > thread when consumer joins group > > > Key: KAFKA-8016 > URL: https://issues.apache.org/jira/browse/KAFKA-8016 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Major > > I think the consumer heartbeat thread has a possibility for a race condition > that can crash it. > I have seen the following client exception after a consumer group rebalance: > {code:java} > INFO Fetcher Resetting offset for partition _ to offset 32110985. > INFO Fetcher Resetting offset for partition _ to offset 32108462. > java.lang.IllegalStateException: No current assignment for partition X > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264) > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562) > at > org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784) > at > org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93) > at > org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704) > at > org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948) > {code} > The logs also had this message in a close timeframe: > {code:java} > INFO ConsumerCoordinator Revoking previously assigned partitions [X, > ...]{code} > > After investigating, I see that there might be a race condition: > > [Updating the fetch > positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] > in the client [involves sending a `ListOffsetsRequest` request to the > broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. > It is possible for the Heartbeat thread to initiate the code that handles > the response in its run > loop([1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]) > > updateFetchPositions() is called from the public methods > `Consumer#position()` and `Consumer#poll()`. > The problem is that > [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] > may mutate the `subscriptions` variable while the offset response handling > by the heartbeat thread takes place. This results in `subscriptions.seek()` > throwing an IllegalStateException. -- This message was sent by
[jira] [Created] (KAFKA-8016) Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group
Stanislav Kozlovski created KAFKA-8016: -- Summary: Race condition resulting in IllegalStateException inside Consumer Heartbeat thread when consumer joins group Key: KAFKA-8016 URL: https://issues.apache.org/jira/browse/KAFKA-8016 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski I have seen the following client exception after a consumer group rebalance: {code:java} INFO Fetcher Resetting offset for partition _ to offset 32110985. INFO Fetcher Resetting offset for partition _ to offset 32108462. java.lang.IllegalStateException: No current assignment for partition X at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetIfNeeded(Fetcher.java:562) at org.apache.kafka.clients.consumer.internals.Fetcher.access$2100(Fetcher.java:93) at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:589) at org.apache.kafka.clients.consumer.internals.Fetcher$2.onSuccess(Fetcher.java:577) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.Fetcher.handleListOffsetResponse(Fetcher.java:784) at org.apache.kafka.clients.consumer.internals.Fetcher.access$2300(Fetcher.java:93) at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:704) at org.apache.kafka.clients.consumer.internals.Fetcher$4.onSuccess(Fetcher.java:699) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948) {code} The logs also had this message in a close timeframe: {code:java} INFO ConsumerCoordinator Revoking previously assigned partitions [X, ...]{code} After investigating, I see that there might be a race condition:[ |https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247] [Updating the fetch positions|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2213] in the client [involves sending a `ListOffsetsRequest` request to the broker|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L603]. It is possible for the Heartbeat thread to handle the response here: [1|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1036]->[2 |https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L247]This happens when the either `Consumer#position()` or `Consumer#poll()` gets called. The problem is that [onJoinPrepare|https://github.com/apache/kafka/blob/dfae20eceed67924c7122698a2c2f8b50a339133/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L479] may mutate the `subscriptions` variable while the offset response handling by the heartbeat thread takes place. This results in `subscriptions.seek()` throwing an IllegalStateException -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7940) Flaky Test CustomQuotaCallbackTest#testCustomQuotaCallback
[ https://issues.apache.org/jira/browse/KAFKA-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-7940. Resolution: Fixed Reviewer: Rajini Sivaram Fixed with https://github.com/apache/kafka/pull/6330#event-2169171736 > Flaky Test CustomQuotaCallbackTest#testCustomQuotaCallback > -- > > Key: KAFKA-7940 > URL: https://issues.apache.org/jira/browse/KAFKA-7940 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0 >Reporter: Matthias J. Sax >Assignee: Stanislav Kozlovski >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/14/] > {quote}java.lang.AssertionError: Too many quotaLimit calls Map(PRODUCE -> 1, > FETCH -> 1, REQUEST -> 4) at org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:105){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7992) Add a server start time metric
Stanislav Kozlovski created KAFKA-7992: -- Summary: Add a server start time metric Key: KAFKA-7992 URL: https://issues.apache.org/jira/browse/KAFKA-7992 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski KIP: KIP-436 As with all software systems, observability into their health is critical. With many deployment platforms (be them custom-built or open-source), tasks like restarting a misbehaving server in a cluster are completely automated. With Kafka, monitoring systems have no definitive source of truth to gauge when a server/client has been started. They are left to either use arbitrary Kafka-specific metrics as a heuristic or the JVM RuntimeMXBean's StartTime, which is not exactly indicative of when the application itself started It would be useful to have a metric exposing when the kafka server has started. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7984) Do not rebuild leader epochs on segments that do not support it
Stanislav Kozlovski created KAFKA-7984: -- Summary: Do not rebuild leader epochs on segments that do not support it Key: KAFKA-7984 URL: https://issues.apache.org/jira/browse/KAFKA-7984 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski h3. Preface https://issues.apache.org/jira/browse/KAFKA-7897 (logs would store some leader epochs even if they did not support them - this is essentially a regression from https://issues.apache.org/jira/browse/KAFKA-7415) https://issues.apache.org/jira/browse/KAFKA-7959 If users are running Kafka with https://issues.apache.org/jira/browse/KAFKA-7415 merged in, chances are they have sparsely-populated leader epoch cache files. KAFKA-7897's implementation unintentionally handled the case of deletes those leader epoch cache files for versions 2.1+. For versions below, KAFKA-7959 fixes that. In any case, as it currently stands, a broker started up with a message format of `0.10.0` will have those leader epoch cache files deleted. h3. Problem We have logic [that rebuilds these leader epoch cache files|https://github.com/apache/kafka/blob/217f45ed554b34d5221e1dd3db76e4be892661cf/core/src/main/scala/kafka/log/Log.scala#L614] when recovering segments that do not have a clean shutdown file. It goes over the record batches and rebuilds the leader epoch. KAFKA-7959's implementation guards against this by checking that the log.message.format supports it, *but* that issue is only merged for versions *below 2.1*. Moreover, the case where `log.message.format >= 0.11` *is not handled*. If a broker has the following log segment file: {code:java} offset 0, format v2, epoch 1 offset 1, format v2, epoch 1 offset 2, format v1, no epoch offset 3, format v1, no epoch {code} and gets upgraded to a new log message format that supports it, the rebuild of any logs that had an unclean shutdown will populate the leader epoch cache again, potentially resulting in the issue described in KAFKA-7897 One potential simple way to solve this is to clear the accumulated leader epoch cache when encountering a batch with no epoch upon segment rebuilding. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7968) Delete leader epoch cache files with old message format versions
[ https://issues.apache.org/jira/browse/KAFKA-7968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-7968. Resolution: Duplicate This is a duplicate of https://issues.apache.org/jira/browse/KAFKA-7959. Apologies for not realizing that was opened > Delete leader epoch cache files with old message format versions > > > Key: KAFKA-7968 > URL: https://issues.apache.org/jira/browse/KAFKA-7968 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.1 >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Major > > [KAFKA-7897 (Invalid use of epoch cache with old message format > versions)|https://issues.apache.org/jira/browse/KAFKA-7897] fixed a critical > bug where replica followers would inadequately use their leader epoch cache > for truncating their logs upon becoming a follower. [The root of the > issue|https://issues.apache.org/jira/browse/KAFKA-7897?focusedCommentId=16761049=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16761049] > was that a regression in KAFKA-7415 caused the leader epoch cache to be > populated upon becoming a follower, even if the message format was older. > KAFKA-7897 fixed that problem by not updating the leader epoch cache if the > message format does not support it. It was merged all the way back to 1.1 but > due to significant branch divergence, the patches for 2.0 and below were > simplified. As said in the commit: > Note this is a simplified fix than what was merged to trunk in #6232 since > the branches have diverged significantly. Rather than removing the epoch > cache file, we guard usage of the cache with the record version. > This results in the same bug being hit at a different time. When the message > format gets upgraded to support the leader epoch cache, brokers start to make > use of it. Due to the previous problem, we still have the sparsely populated > epoch cache file present. This results in the same large truncations we saw > in KAFKA-7897. > The key difference is that the patches for 2.1 and trunk *deleted* the > non-empty leader epoch cache files if the log message format did not support > it. > We should update the earlier versions to do the same thing. That way, users > that have upgraded to 2.0.1 but are still using old message formats/protocol > will have their epochs cleaned up on the first roll that upgrades the > `inter.broker.protocol.version` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7968) Delete leader epoch cache files with old message format versions
Stanislav Kozlovski created KAFKA-7968: -- Summary: Delete leader epoch cache files with old message format versions Key: KAFKA-7968 URL: https://issues.apache.org/jira/browse/KAFKA-7968 Project: Kafka Issue Type: Bug Affects Versions: 2.0.1 Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski [KAFKA-7897 (Invalid use of epoch cache with old message format versions)|https://issues.apache.org/jira/browse/KAFKA-7897] fixed a critical bug where replica followers would inadequately use their leader epoch cache for truncating their logs upon becoming a follower. [The root of the issue|https://issues.apache.org/jira/browse/KAFKA-7897?focusedCommentId=16761049=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16761049] was that a regression in KAFKA-7415 caused the leader epoch cache to be populated upon becoming a follower, even if the message format was older. KAFKA-7897 fixed that problem by not updating the leader epoch cache if the message format does not support it. It was merged all the way back to 1.1 but due to significant branch divergence, the patches for 2.0 and below were simplified. As said in the commit: Note this is a simplified fix than what was merged to trunk in #6232 since the branches have diverged significantly. Rather than removing the epoch cache file, we guard usage of the cache with the record version. This results in the same bug being hit at a different time. When the message format gets upgraded to support the leader epoch cache, brokers start to make use of it. Due to the previous problem, we still have the sparsely populated epoch cache file present. This results in the same large truncations we saw in KAFKA-7897. The key difference is that the patches for 2.1 and trunk *deleted* the non-empty leader epoch cache files if the log message format did not support it. We should update the earlier versions to do the same thing. That way, users that have upgraded to 2.0.1 but are still using old message formats/protocol will have their epochs cleaned up on the first roll that upgrades the `inter.broker.protocol.version` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7910) Document retention.ms behavior with record timestamp
Stanislav Kozlovski created KAFKA-7910: -- Summary: Document retention.ms behavior with record timestamp Key: KAFKA-7910 URL: https://issues.apache.org/jira/browse/KAFKA-7910 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski It is intuitive to believe that `log.retention.ms` starts applying once a log file is closed. The documentation says: > This configuration controls the maximum time we will retain a log before we >will discard old log segments to free up space if we are using the "delete" >retention policy. Yet, the actual behavior is that we take into account the largest timestamp of that segment file ([https://github.com/apache/kafka/blob/4cdbb3e5c19142d118f0f3999dd3e21deccb3643/core/src/main/scala/kafka/log/Log.scala#L1246)|https://github.com/apache/kafka/blob/4cdbb3e5c19142d118f0f3999dd3e21deccb3643/core/src/main/scala/kafka/log/Log.scala#L1246).] and then consider `retention.ms` on top of that. This means that if Kafka is configured with `log.message.timestamp.type=CreateTime` (as it is by default), any records that have a future timestamp set by the producer will not get deleted as expected by the initial intuition (and documentation) of the `log.retention.ms`. We should document the behavior of `retention.ms` with the record timestamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7893) Refactor ConsumerBounceTest.scala
Stanislav Kozlovski created KAFKA-7893: -- Summary: Refactor ConsumerBounceTest.scala Key: KAFKA-7893 URL: https://issues.apache.org/jira/browse/KAFKA-7893 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski The [ConsumerBounceTest|https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala] has logic to create consumers and ensure that they are in the same consumer group. There is code that already does this in `BaseConsumerTest` and `PlaintextConsumerTest` -it would be better if we could make `ConsumerBounceTest` reuse that functionality. As part of [https://github.com/apache/kafka/pull/6163,] it is worth investigating if we could assert the newly-introduced test cases there with unit tests as opposed to integration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7819) Trogdor - Improve RoundTripWorker
Stanislav Kozlovski created KAFKA-7819: -- Summary: Trogdor - Improve RoundTripWorker Key: KAFKA-7819 URL: https://issues.apache.org/jira/browse/KAFKA-7819 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Trogdor's RoundTripWorker task has a couple of shortcomings: * Consumer GroupID is hardcoded and consumers use `KafkaConsumer#assign()`: [https://github.com/apache/kafka/blob/12947f4f944955240fd14ce8b75fab5464ea6808/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java#L314] Leaving you unable to run two separate instances of this worker on the same partition in the same cluster, as the consumers would overwrite each other's commits. It's probably better to add the task ID to the consumer group * the task spec's `maxMessages` [is an integer|https://github.com/apache/kafka/blob/12947f4f944955240fd14ce8b75fab5464ea6808/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java#L39], leaving you unable to schedule long-winded tasks * the consumer and producer are not closed gracefully when the task ends -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7800) Extend Admin API to support dynamic application log levels
Stanislav Kozlovski created KAFKA-7800: -- Summary: Extend Admin API to support dynamic application log levels Key: KAFKA-7800 URL: https://issues.apache.org/jira/browse/KAFKA-7800 Project: Kafka Issue Type: New Feature Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski [https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels] Logging is a critical part of any system's infrastructure. It is the most direct way of observing what is happening with a system. In the case of issues, it helps us diagnose the problem quickly which in turn helps lower the [MTTR|http://enterprisedevops.org/article/devops-metric-mean-time-to-recovery-mttr-definition-and-reasoning]. Kafka supports application logging via the log4j library and outputs messages in various log levels (TRACE, DEBUG, INFO, WARN, ERROR). Log4j is a rich library that supports fine-grained logging configurations (e.g use INFO-level logging in {{kafka.server.ReplicaManager}} and use DEBUG-level in {{kafka.server.KafkaApis}}). This is statically configurable through the [log4j.properties|https://github.com/apache/kafka/blob/trunk/config/log4j.properties] file which gets read once at broker start-up. A problem with this static configuration is that we cannot alter the log levels when a problem arises. It is severely impractical to edit a properties file and restart all brokers in order to gain visibility of a problem taking place in production. It would be very useful if we support dynamically altering the log levels at runtime without needing to restart the Kafka process. Log4j itself supports dynamically altering the log levels in a programmatic way and Kafka exposes a [JMX API|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Log4jController.scala] that lets you alter them. This allows users to change the log levels via a GUI (jconsole) or a CLI (jmxterm) that uses JMX. There is one problem with changing log levels through JMX that we hope to address and that is *Ease of Use*: * Establishing a connection - Connecting to a remote process via JMX requires configuring and exposing multiple JMX ports to the outside world. This is a burden on users, as most production deployments may stand behind layers of firewalls and have policies against opening ports. This makes opening the ports and connections in the middle of an incident even more burdensome * Security - JMX and tools around it support authentication and authorization but it is an additional hassle to set up credentials for another system. * Manual process - Changing the whole cluster's log level requires manually connecting to each broker. In big deployments, this is severely impractical and forces users to build tooling around it. h4. Proposition Ideally, Kafka would support dynamically changing log levels and address all of the aforementioned concerns out of the box. We propose extending the IncrementalAlterConfig/DescribeConfig Admin API with functionality for dynamically altering the broker's log level. This approach would also pave the way for even finer-grained logging logic (e.g log DEBUG level only for a certain topic) and would allow us to leverage the existing *AlterConfigPolicy* for custom user-defined validation of log-level changes. These log-level changes will be *temporary* and reverted on broker restart - we will not persist them anywhere. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7790) Trogdor - Does not time out tasks in time
Stanislav Kozlovski created KAFKA-7790: -- Summary: Trogdor - Does not time out tasks in time Key: KAFKA-7790 URL: https://issues.apache.org/jira/browse/KAFKA-7790 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski All Trogdor task specifications have a defined `startMs` and `durationMs`. Under conditions of task failure and restarts, it is intuitive to assume that a task would not be re-ran after a certain time period. Let's best illustrate the issue with an example: {code:java} startMs = 12PM; durationMs = 1hour; # 12:02 - Coordinator schedules a task to run on agent-0 # 12:45 - agent-0 process dies. Coordinator's heartbeats to agent-0 fail. # 12:47 - agent-0 comes back up. Coordinator's heartbeats pass and it re-schedules tasks that are not running in agent-0 # 13:20 - agent-0 process dies. # 13:22 - agent-0 comes back up. Coordinator re-schedules task{code} This can result in an endless loop of task rescheduling. If there are more tasks scheduled on agent-0 (e.g a task scheduled to start each on hour), we can end up in a scenario where we overwhelm the agent with tasks that we would rather have dropped. h2. Changes We propose that the Trogdor Coordinator does not re-schedule a task if the current time of re-scheduling is greater than the start time of the task and its duration combined. More specifically: {code:java} if (currentTimeMs < startTimeMs + durationTimeMs) scheduleTask() else failTask(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7618) Trogdor - Fix /tasks endpoint parameters
[ https://issues.apache.org/jira/browse/KAFKA-7618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-7618. Resolution: Fixed > Trogdor - Fix /tasks endpoint parameters > > > Key: KAFKA-7618 > URL: https://issues.apache.org/jira/browse/KAFKA-7618 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It > supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, > `firstEndMs`, `lastEndMs`. > These arguments denote milliseconds since the unix epoch. > There is a bug currently where the endpoint parses the arguments as integers, > whereas they should be long (the current unix millisecond timestamp does not > fit into an integer). > This results in API calls returning a 404 > {code:java} > curl -v -L -G -d "firstStartMs=1542028764787" localhost:8889/coordinator/tasks > * Trying ::1... > * TCP_NODELAY set > * Connected to localhost (::1) port 8889 (#0) > > GET /coordinator/tasks?firstStartMs=154202876478 HTTP/1.1 > > Host: localhost:8889 > > User-Agent: curl/7.54.0 > > Accept: */* > > > < HTTP/1.1 500 Internal Server Error > < Date: Mon, 12 Nov 2018 13:28:59 GMT > < Content-Type: application/json > < Content-Length: 43 > < Server: Jetty(9.4.12.v20180830) > < > * Connection #0 to host localhost left intact{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7564) Trogdor - Expose single task details from Trogdor Coordinator
[ https://issues.apache.org/jira/browse/KAFKA-7564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-7564. Resolution: Fixed > Trogdor - Expose single task details from Trogdor Coordinator > - > > Key: KAFKA-7564 > URL: https://issues.apache.org/jira/browse/KAFKA-7564 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > The only way to currently get the results from tasks ran in Trogdor is > through listing all of them via the "--show-tasks" CLI command > {code:java} > ./bin/trogdor.sh client --show-tasks localhost:8889 Got coordinator tasks: > { "tasks":{ "produce_bench_20462":{ "state":"DONE", > "spec":{ > "class":"org.apache.kafka.trogdor.workload.ProduceBenchSpec", > "startMs":0, "durationMs":1000, > "producerNode":"node0", "bootstrapServers":"localhost:9092", > "targetMessagesPerSec":10, "maxMessages":100, > "keyGenerator":{ "type":"sequential", > "size":4, "startOffset":0 }, > "valueGenerator":{ "type":"constant", > "size":512, > "value":"AAA=" > }, "totalTopics":10, "activeTopics":5, > "topicPrefix":"foo", "replicationFactor":1, > "classLoader":{ }, "numPartitions":1 }, > "startedMs":1523552769850, "doneMs":1523552780878, > "cancelled":false, "status":{ "totalSent":500, > "averageLatencyMs":4.972, "p50LatencyMs":4, > "p95LatencyMs":6, "p99LatencyMs":12 } } } } > {code} > This can prove inefficient and annoying if the Trogdor Coordinator is > long-running and we only want to get the results from a specific task. > The current REST endpoint ("/tasks") for listing tasks enables filtering > through StartTimeMs/EndTimeMs and supplying specific TaskIDs, but it would be > cleaner if we had a specific endpoint for fetching a single task. That > endpoint would also return a 404 in the case where no task was found instead > of an empty response as the /tasks endpoint would. > I propose we expose a new "/tasks/:id" endpoint and a new cli command > "--show-task TASK_ID" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7514) Trogdor - Support Multiple Threads in ConsumeBenchWorker
[ https://issues.apache.org/jira/browse/KAFKA-7514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-7514. Resolution: Fixed > Trogdor - Support Multiple Threads in ConsumeBenchWorker > > > Key: KAFKA-7514 > URL: https://issues.apache.org/jira/browse/KAFKA-7514 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > Trogdor's ConsumeBenchWorker currently uses only two threads - one for the > StatusUpdater: > {code:java} > this.statusUpdaterFuture = executor.scheduleAtFixedRate( > new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, > TimeUnit.MINUTES); > {code} > and one for the consumer task itself > {code:java} > executor.submit(new ConsumeMessages(partitions)); > {code} > A sample ConsumeBenchSpec specification in JSON looks like this: > {code:java} > { > "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > "durationMs": 1000, > "consumerNode": "node0", > "bootstrapServers": "localhost:9092", > "maxMessages": 100, > "activeTopics": { > "foo[1-3]": { > "numPartitions": 3, > "replicationFactor": 1 > } > } > } > {code} > > > h2. Motivation > This does not make the best use of machines with multiple cores. It would be > useful if there was a way to configure the ConsumeBenchSpec to use multiple > threads and spawn multiple consumers. This would also allow the > ConsumeBenchWorker to work with a higher amount of throughput due to the > parallelism taking place. > > h2. > h2. Proposal > Add a new `consumerCount` property to the ConsumeBenchSpec allowing you to > run multiple consumers in parallel > h2. Changes > By default, it will have a value of 1. > `activeTopics` will still be defined in the same way. They will be evenly > assigned to the consumers in a round-robin fashion. > For example, if we have this configuration > {code:java} > { > "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > "durationMs": 1000, > "consumerNode": "node0", > "bootstrapServers": "localhost:9092", > "maxMessages": 100, > "consumerCount": 2, > "activeTopics": { > "foo[1-4]": { > "numPartitions": 4, > "replicationFactor": 1 > } > } > }{code} > consumer 1 will be assigned partitions [foo1, foo3] > consumer 2 will be assigned partitions [foo2, foo4] > and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 > for every consumer). > > The `maxMessages` and `targetMessagesPerSec` will be counted independently > for every consumer > h3. Status > The way the worker's status will be updated as well. > A ConsumeBenchWorker shows the following status when queried with > `./bin/trogdor.sh client --show-tasks localhost:8889` > > {code:java} > "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : > "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > ... > "status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, > "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : > 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code} > We will change it to show the status of every separate consumer and the topic > partitions it was assigned to > {code:java} > "tasks" : { > "consume_bench_19938" : > { > "state" : "DONE", > "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", > ... } > ... > "status":{ > "consumer-1":{ > "assignedPartitions":[ > "foo1", > "foo3" > ], > "totalMessagesReceived":190, > "totalBytesReceived":98040, > "averageMessageSizeBytes":516, > "averageLatencyMs":449.0, > "p50LatencyMs":449, > "p95LatencyMs":449, > "p99LatencyMs":449 > }, > "consumer-2":{ > "assignedPartitions":[ > "foo2", > "foo4" > ], > "totalMessagesReceived":190, > "totalBytesReceived":98040, > "averageMessageSizeBytes":516, > "averageLatencyMs":449.0, > "p50LatencyMs":449, > "p95LatencyMs":449, > "p99LatencyMs":449 > } > } > },{code} > > > h2. > Backwards Compatibility: > This change should be mostly backwards-compatible. If the `consumerThreads` > is not passed - only one consumer will be created and the round-robin > assignor will assign every partition to it. > The only change will be in the format of the reported status. Even with one > consumer, we will still show a status similar to > {code:java} > "status":{ > "consumer-1":{ > "assignedPartitions":[ > "foo1", > "foo3" > ], >
[jira] [Resolved] (KAFKA-7515) Trogdor - Add Consumer Group Benchmark Specification
[ https://issues.apache.org/jira/browse/KAFKA-7515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-7515. Resolution: Fixed > Trogdor - Add Consumer Group Benchmark Specification > > > Key: KAFKA-7515 > URL: https://issues.apache.org/jira/browse/KAFKA-7515 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > Trogdor's `ConsumeBenchWorker` and `ConsumeBenchSpec` currently takes > specific topic partitions and assigns a consumer to them > ([https://github.com/apache/kafka/blob/509dd95ebbf03681ea680a84b8436814ba3e7541/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L125)] > It is useful to have functionality that supports consumer group usage since > most Kafka consumers in practice subscribe to topics, not specific > partitions. Using the `subscribe()` API will also make use of the consumer > group, allowing for more flexible benchmark tests (e.g consuming a topic > while creating a new partition) > This will also allow for benchmarking more real-life scenarios like spinning > up multiple consumers in a consumer group via spawning multiple Trogdor > agents (or multiple consumers in one agent if > https://issues.apache.org/jira/browse/KAFKA-7514 gets accepted) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7717) Enable security configs in kafka.tools.EndToEndLatency
Stanislav Kozlovski created KAFKA-7717: -- Summary: Enable security configs in kafka.tools.EndToEndLatency Key: KAFKA-7717 URL: https://issues.apache.org/jira/browse/KAFKA-7717 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski The [end to end latency tool|[http://example.com|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/EndToEndLatency.scala]] does not support security configurations for authenticating to a secured broker. It only accepts `bootstrap.servers`, rendering it useless against SASL-secured clusters -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7675) Trogdor CLI - Support filter arguments in the '--show-tasks' command
Stanislav Kozlovski created KAFKA-7675: -- Summary: Trogdor CLI - Support filter arguments in the '--show-tasks' command Key: KAFKA-7675 URL: https://issues.apache.org/jira/browse/KAFKA-7675 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski CoordinatorClient's --show-tasks currently does not take any parameters. The request it sends behind the curtains has 6 fields which enable filtering of the output - 'firstStartMs', 'lastStartMs', 'firstEndMs', 'lastEndMs', 'state' and 'taskIds'. All of them should be exposed to enable users to filter appropriately -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7670) Fix flaky test - KafkaAdminClientTest.testUnreachableBootstrapServer
Stanislav Kozlovski created KAFKA-7670: -- Summary: Fix flaky test - KafkaAdminClientTest.testUnreachableBootstrapServer Key: KAFKA-7670 URL: https://issues.apache.org/jira/browse/KAFKA-7670 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski It fails around once every 30 runs locally with {code:java} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testUnreachableBootstrapServer(KafkaAdminClientTest.java:277) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7638) Trogdor - Support mass task creation endpoint
Stanislav Kozlovski created KAFKA-7638: -- Summary: Trogdor - Support mass task creation endpoint Key: KAFKA-7638 URL: https://issues.apache.org/jira/browse/KAFKA-7638 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Trogdor supports the creation of tasks via the `coordinator/tasks/create` endpoint - it currently accepts only one task. Since Trogdor support scheduling multiple jobs to execute at a certain time (via the `startTime` task parameter leveraged by all tasks), it makes sense to support creating multiple tasks in a single endpoint. Users might want to leverage the scheduler to, say, create 100 tasks. In the current model, they would need to issue 100 requests - which is inefficient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7619) Trogdor - Allow filtering tasks by state in /coordinator/tasks endpoint
Stanislav Kozlovski created KAFKA-7619: -- Summary: Trogdor - Allow filtering tasks by state in /coordinator/tasks endpoint Key: KAFKA-7619 URL: https://issues.apache.org/jira/browse/KAFKA-7619 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, `firstEndMs`, `lastEndMs`. These arguments denote milliseconds since the unix epoch. It would be useful to support filtering by the state of the task. We currently have no way of getting every `RUNNING`, `STOPPED`, or `PENDING` task unless we want to manually filter through everything returned by `/coordinator/tasks`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7618) Trogdor - Fix /tasks endpoint parameters
Stanislav Kozlovski created KAFKA-7618: -- Summary: Trogdor - Fix /tasks endpoint parameters Key: KAFKA-7618 URL: https://issues.apache.org/jira/browse/KAFKA-7618 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski A Trogdor Coordinator's `/tasks` endpoint returns the status of all tasks. It supports arguments to filter the tasks, like `firstStartMs`, `lastStartMs`, `firstEndMs`, `lastEndMs`. These arguments denote milliseconds since the unix epoch. There is a bug currently where the endpoint parses the arguments as integers, whereas they should be long (the current unix millisecond timestamp does not fit into an integer). This results in API calls returning a 404 {code:java} curl -v -L -G -d "firstStartMs=1542028764787" localhost:8889/coordinator/tasks * Trying ::1... * TCP_NODELAY set * Connected to localhost (::1) port 8889 (#0) > GET /coordinator/tasks?firstStartMs=154202876478 HTTP/1.1 > Host: localhost:8889 > User-Agent: curl/7.54.0 > Accept: */* > < HTTP/1.1 500 Internal Server Error < Date: Mon, 12 Nov 2018 13:28:59 GMT < Content-Type: application/json < Content-Length: 43 < Server: Jetty(9.4.12.v20180830) < * Connection #0 to host localhost left intact{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7599) Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second
Stanislav Kozlovski created KAFKA-7599: -- Summary: Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second Key: KAFKA-7599 URL: https://issues.apache.org/jira/browse/KAFKA-7599 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski In Trogdor, the ConsumeBench, ProduceBench and RoundTrip workers all take in an argument called "targetMessagesPerSec". That argument works as an upper bound on the number of messages that can be consumed/produced per second in that worker. It is useful to support infinite messages per second. Currently, if the `targetMessagesPerSec` field is not present in the request, the RoundTripWorker will raise an exception, whereas the ConsumeBench and ProduceBench workers will work as if they had `targetMessagesPerSec=10`. I propose we allow for unbounded `targetMessagesPerSec` if the field is not present. Further, it would be very useful if some of these workers showed the `messagesPerSecond` they have been producing/consuming at. Even now, giving the worker a `targetMessagesPerSec` does not guarantee that the worker will reach the needed `targetMessagesPerSec`. There is no easy way of knowing how the worker performed - you have to subtract the status fields `startedMs` and `doneMs` to get the total duration of the task, convert to seconds and then divide that by the `maxMessages` field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7597) Trogdor - Support transactions in ProduceBenchWorker
Stanislav Kozlovski created KAFKA-7597: -- Summary: Trogdor - Support transactions in ProduceBenchWorker Key: KAFKA-7597 URL: https://issues.apache.org/jira/browse/KAFKA-7597 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski The `ProduceBenchWorker` allows you to schedule a benchmark of a Kafka Producer. It would prove useful if we supported transactions in this producer, as to allow benchmarks with transactions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7579) System Test Failure - security_test.SecurityTest.test_client_ssl_endpoint_validation_failure
Stanislav Kozlovski created KAFKA-7579: -- Summary: System Test Failure - security_test.SecurityTest.test_client_ssl_endpoint_validation_failure Key: KAFKA-7579 URL: https://issues.apache.org/jira/browse/KAFKA-7579 Project: Kafka Issue Type: Bug Affects Versions: 2.0.1 Reporter: Stanislav Kozlovski The security_test.SecurityTest.test_client_ssl_endpoint_validation_failure test with security_protocol=SSL fails to pass {code:java} SESSION REPORT (ALL TESTS) ducktape version: 0.7.1 session_id: 2018-10-31--002 run time: 2 minutes 12.452 seconds tests run: 2 passed: 1 failed: 1 ignored: 0 test_id: kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=PLAINTEXT.security_protocol=SSL status: FAIL run time: 1 minute 2.149 seconds Node ducker@ducker05: did not stop within the specified timeout of 15 seconds Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", line 132, in run data = self.run_test() File "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", line 185, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 324, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/core/security_test.py", line 114, in test_client_ssl_endpoint_validation_failure self.consumer.stop() File "/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py", line 80, in stop super(BackgroundThreadService, self).stop() File "/usr/local/lib/python2.7/dist-packages/ducktape/services/service.py", line 278, in stop self.stop_node(node) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 254, in stop_node (str(node.account), str(self.stop_timeout_sec)) AssertionError: Node ducker@ducker05: did not stop within the specified timeout of 15 seconds test_id: kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=SSL.security_protocol=PLAINTEXT status: PASS run time: 1 minute 10.144 seconds ducker-ak test failed {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7564) Trogdor - Expose single task details from Trogdor Coordinator
Stanislav Kozlovski created KAFKA-7564: -- Summary: Trogdor - Expose single task details from Trogdor Coordinator Key: KAFKA-7564 URL: https://issues.apache.org/jira/browse/KAFKA-7564 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski The only way to currently get the results from tasks ran in Trogdor is through listing all of them via the "--show-tasks" CLI command {code:java} ./bin/trogdor.sh client --show-tasks localhost:8889 Got coordinator tasks: { "tasks":{ "produce_bench_20462":{ "state":"DONE", "spec":{ "class":"org.apache.kafka.trogdor.workload.ProduceBenchSpec", "startMs":0, "durationMs":1000, "producerNode":"node0", "bootstrapServers":"localhost:9092", "targetMessagesPerSec":10, "maxMessages":100, "keyGenerator":{ "type":"sequential", "size":4, "startOffset":0 }, "valueGenerator":{ "type":"constant", "size":512, "value":"AAA=" }, "totalTopics":10, "activeTopics":5, "topicPrefix":"foo", "replicationFactor":1, "classLoader":{ }, "numPartitions":1 }, "startedMs":1523552769850, "doneMs":1523552780878, "cancelled":false, "status":{ "totalSent":500, "averageLatencyMs":4.972, "p50LatencyMs":4, "p95LatencyMs":6, "p99LatencyMs":12 } } } } {code} This can prove inefficient and annoying if the Trogdor Coordinator is long-running and we only want to get the results from a specific task. The current REST endpoint ("/tasks") for listing tasks enables filtering through StartTimeMs/EndTimeMs and supplying specific TaskIDs, but it would be cleaner if we had a specific endpoint for fetching a single task. That endpoint would also return a 404 in the case where no task was found instead of an empty response as the /tasks endpoint would. I propose we expose a new "/tasks/:id" endpoint and a new cli command "--show-task TASK_ID" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7561) Console Consumer - system test fails
Stanislav Kozlovski created KAFKA-7561: -- Summary: Console Consumer - system test fails Key: KAFKA-7561 URL: https://issues.apache.org/jira/browse/KAFKA-7561 Project: Kafka Issue Type: Bug Affects Versions: 2.1.0 Reporter: Stanislav Kozlovski The test under `kafkatest.sanity_checks.test_console_consumer.ConsoleConsumerTest.test_lifecycle` fails when I run it locally. 7 versions of the test failed for me and they all had a similar error message: {code:java} AssertionError: Node ducker@ducker11: did not stop within the specified timeout of 15 seconds {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7560) Client Quota - system test failure
Stanislav Kozlovski created KAFKA-7560: -- Summary: Client Quota - system test failure Key: KAFKA-7560 URL: https://issues.apache.org/jira/browse/KAFKA-7560 Project: Kafka Issue Type: Bug Affects Versions: 2.1.0 Reporter: Stanislav Kozlovski The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails when I run it locally. It produces the following error message: {code:java} File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, in validate metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics ValueError: max() arg is an empty sequence {code} I assume it cannot find the metric it's searching for -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7559) ConnectStandaloneFileTest system tests do not pass
Stanislav Kozlovski created KAFKA-7559: -- Summary: ConnectStandaloneFileTest system tests do not pass Key: KAFKA-7559 URL: https://issues.apache.org/jira/browse/KAFKA-7559 Project: Kafka Issue Type: Improvement Affects Versions: 2.1.0 Reporter: Stanislav Kozlovski Both tests `test_skip_and_log_to_dlq` and `test_file_source_and_sink` under `kafkatest.tests.connect.connect_test.ConnectStandaloneFileTest` fail with error messages similar to: "TimeoutError: Kafka Connect failed to start on node: ducker@ducker04 in condition mode: LISTEN" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7528) Make Min and Max metrics' default value consistent with each other
Stanislav Kozlovski created KAFKA-7528: -- Summary: Make Min and Max metrics' default value consistent with each other Key: KAFKA-7528 URL: https://issues.apache.org/jira/browse/KAFKA-7528 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Fix For: 2.2.0 KIP-386: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-386%3A+Make+Min+metrics%27+default+value+consistent+with+Max+metrics] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7517) Add a minimum retention.bytes config value
Stanislav Kozlovski created KAFKA-7517: -- Summary: Add a minimum retention.bytes config value Key: KAFKA-7517 URL: https://issues.apache.org/jira/browse/KAFKA-7517 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Some configs like `log.retention.bytes` make no sense to have values of 0 - every log has a size of 0 upon creation and therefore every log should be deleted in this case. It would be useful to have some sort of guard, as limited as it could be, to help users not shoot themselves in the foot as easily (either by manual misconfiguration or some external tool (e.g k8s configmap )) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7515) Trogdor - Add Consumer Group Benchmark Specification
Stanislav Kozlovski created KAFKA-7515: -- Summary: Trogdor - Add Consumer Group Benchmark Specification Key: KAFKA-7515 URL: https://issues.apache.org/jira/browse/KAFKA-7515 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Trogdor's `ConsumeBenchWorker` and `ConsumeBenchSpec` currently takes specific topic partitions and assigns a consumer to them ([https://github.com/apache/kafka/blob/509dd95ebbf03681ea680a84b8436814ba3e7541/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L125)] It is useful to have functionality that supports consumer group usage since most Kafka consumers in practice use consumer groups. This will allow for benchmarking more real-life scenarios like spinning up multiple consumers in a consumer group via spawning multiple Trogdor agents (or multiple consumers in one agent if https://issues.apache.org/jira/browse/KAFKA-7514 gets accepted) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7514) Trogdor - Support Multiple Threads in ConsumeBenchWorker
Stanislav Kozlovski created KAFKA-7514: -- Summary: Trogdor - Support Multiple Threads in ConsumeBenchWorker Key: KAFKA-7514 URL: https://issues.apache.org/jira/browse/KAFKA-7514 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Trogdor's ConsumeBenchWorker currently uses only two threads - one for the StatusUpdater: {code:java} this.statusUpdaterFuture = executor.scheduleAtFixedRate( new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES); {code} and one for the consumer task itself {code:java} executor.submit(new ConsumeMessages(partitions)); {code} A sample ConsumeBenchSpec specification in JSON looks like this: {code:java} { "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", "durationMs": 1000, "consumerNode": "node0", "bootstrapServers": "localhost:9092", "maxMessages": 100, "activeTopics": { "foo[1-3]": { "numPartitions": 3, "replicationFactor": 1 } } } {code} h2. Motivation This does not make the best use of machines with multiple cores. It would be useful if there was a way to configure the ConsumeBenchSpec to use multiple threads and spawn multiple consumers. This would also allow the ConsumeBenchWorker to work with a higher amount of throughput due to the parallelism taking place. h2. Proposal: Add a new `consumerThreads` property to the ConsumeBenchSpec allowing you to run multiple consumers in parallel Changes By default, it will have a value of 1. `activeTopics` will still be defined in the same way. They will be evenly assigned to the consumers in a round-robin fashion. For example, if we have this configuration {code:java} { "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", "durationMs": 1000, "consumerNode": "node0", "bootstrapServers": "localhost:9092", "maxMessages": 100, "consumerThreads": 2, "activeTopics": { "foo[1-4]": { "numPartitions": 4, "replicationFactor": 1 } } }{code} consumer thread 1 will be assigned partitions [foo1, foo3] consumer thread 2 will be assigned partitions [foo2, foo4] and the ConsumeBenchWorker will spawn 4 threads in total in the executor (2 for every consumer) h3. Status The way the worker's status will be updated as well. A ConsumeBenchWorker shows the following status when queried with `./bin/trogdor.sh client --show-tasks localhost:8889` {code:java} "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", ... "status" : { "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } },{code} We will change it to show the status of every separate consumer and the topic partitions it was assigned to {code:java} "tasks" : { "consume_bench_19938" : { "state" : "DONE", "spec" : { "class" : "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", ... "status" : { "consumer-1": { "assignedPartitions": ["foo1", "foo3"], "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } "consumer-2": { "assignedPartitions": ["foo2", "foo4"], "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } } },{code} h2. Backwards Compatibility: This change should be mostly backwards-compatible. If the `consumerThreads` is not passed - only one consumer will be created and the round-robin assignor will assign every partition to it. The only change will be in the format of the reported status. Even with one consumer, we will still show a status similar to {code:java} "status" : { "consumer-1": { "assignedPartitions": ["foo1", "foo3"], "totalMessagesReceived" : 190, "totalBytesReceived" : 98040, "averageMessageSizeBytes" : 516, "averageLatencyMs" : 449.0, "p50LatencyMs" : 449, "p95LatencyMs" : 449, "p99LatencyMs" : 449 } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7478) Reduce OAuthBearerLoginModule verbosity
Stanislav Kozlovski created KAFKA-7478: -- Summary: Reduce OAuthBearerLoginModule verbosity Key: KAFKA-7478 URL: https://issues.apache.org/jira/browse/KAFKA-7478 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski The OAuthBearerLoginModule is pretty verbose by default and this fills logs in with too much information. It would be nice if we could reduce the verbosity by default and let the user opt in to inspect these debug-friendly messages {code:java} [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule login - Login succeeded; invoke commit() to commit it; current committed token count=0 [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule commit - Committing my token; current committed token count = 0 [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule commit - Done committing my token; committed token count is now 1 [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin login - Successfully logged in. {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7266) Fix MetricsTest test flakiness
Stanislav Kozlovski created KAFKA-7266: -- Summary: Fix MetricsTest test flakiness Key: KAFKA-7266 URL: https://issues.apache.org/jira/browse/KAFKA-7266 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski Fix For: 2.0.1, 2.1.0 The test `kafka.api.MetricsTest.testMetrics` has been failing intermittently in kafka builds (recent proof: https://github.com/apache/kafka/pull/5436#issuecomment-409683955) The particular failure is in the `MessageConversionsTimeMs` metric assertion - {code} java.lang.AssertionError: Message conversion time not recorded 0.0 {code} There has been work done previously (https://github.com/apache/kafka/pull/4681) to combat the flakiness of the test and while it has improved it, the test still fails sometimes. h3. Solution On my machine, the test failed 5 times out of 25 runs. I suspect the solution would be to increase the record batch size to ensure the conversion takes more than 1ms time so as to be recorded by the metric. Increasing the maximum batch size from 1MB to 8MB made the test fail locally once out of 100 times. Setting it to 16MBs seems to fix the problem. I've ran 300 runs and have not seen a failure with 16MBs set as the batch size -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7215) Improve LogCleaner behavior on error
Stanislav Kozlovski created KAFKA-7215: -- Summary: Improve LogCleaner behavior on error Key: KAFKA-7215 URL: https://issues.apache.org/jira/browse/KAFKA-7215 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski For more detailed information see [KIP-346|https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7169) Add support for Custom SASL extensions in OAuth authentication
Stanislav Kozlovski created KAFKA-7169: -- Summary: Add support for Custom SASL extensions in OAuth authentication Key: KAFKA-7169 URL: https://issues.apache.org/jira/browse/KAFKA-7169 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski KIP: here -- This message was sent by Atlassian JIRA (v7.6.3#76005)