[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown
RamanVerma commented on a change in pull request #9364: URL: https://github.com/apache/kafka/pull/9364#discussion_r509921859 ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -4429,9 +4485,10 @@ class LogTest { scheduler: Scheduler = mockTime.scheduler, time: Time = mockTime, maxProducerIdExpirationMs: Int = 60 * 60 * 1000, -producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs): Log = { +producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs, +lastShutdownClean: Boolean = true): Log = { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown
RamanVerma commented on a change in pull request #9364: URL: https://github.com/apache/kafka/pull/9364#discussion_r509921322 ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -4447,9 +4504,10 @@ class LogTest { private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long], - expectDeletedFiles: Boolean = true): Log = { + expectDeletedFiles: Boolean = true, Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7718) Allow customized header inheritance for stateful operators in DSL
[ https://issues.apache.org/jira/browse/KAFKA-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218801#comment-17218801 ] Suresh Achary commented on KAFKA-7718: -- Hello, this is a very useful feature for many implementations. Just wanted to know if is there any ETA for this feature update. Thank you. > Allow customized header inheritance for stateful operators in DSL > - > > Key: KAFKA-7718 > URL: https://issues.apache.org/jira/browse/KAFKA-7718 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip > > As a follow-up work of > https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API, > we want to provide allow users to customize how record headers are inherited > while traversing the topology at the DSL layer (at the lower-level Processor > API layer, users are already capable for customizing and inheriting the > headers as they forward the records to next processor nodes). > Today the headers are implicitly inherited throughout the topology without > any modifications within the Streams library. For stateless operators > (filter, map, etc) this default inheritance policy should be sufficient. For > stateful operators where multiple input records may be generating a single > record (i.e. it is an n:1 transformations rather than 1:1 mapping), since we > only inherit from the triggering record, which would seem to be a "random" > choice to the users and other records' headers are lost. > I'd propose we extend DSL to allow users to customize the headers inheritance > policy for stateful operators, namely Joins and Aggregations. It would > contain two parts: > 1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control > object with an additional function that allows users to pass in a lambda > function (let's say its called HeadersMerger, but name subject to discuss > over KIP) that takes two Headers object and generated a single Headers object > in the return value. > 2) On the implementation layer, we need to actually store the headers at the > materialized state store so that they can be retrieved along with the record > for join / aggregation processor. This would be changing the state store > value bytes organization and hence better be considered carefully. Then when > join / aggregate processor is triggered, the Headers of both records will be > retrieved (one from the triggering record, one read from the materialized > state store) and then passed to the HeadersMerger. Some low-hanging > optimizations can be considered though, e.g. if users do not have overridden > this interface, then we can consider not reading the headers from the other > side at all to save IO cost. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #9431: KAFKA-10426: Deadlock on session key update.
kkonstantine commented on pull request #9431: URL: https://github.com/apache/kafka/pull/9431#issuecomment-714248893 Thanks for checking @xakassi Merged all the way to 2.4. I updated the fix versions and closed the jira. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10426) Deadlock in KafkaConfigBackingStore
[ https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-10426: --- Fix Version/s: 2.6.1 2.5.2 2.7.0 2.4.2 > Deadlock in KafkaConfigBackingStore > --- > > Key: KAFKA-10426 > URL: https://issues.apache.org/jira/browse/KAFKA-10426 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.1, 2.6.0 >Reporter: Goltseva Taisiia >Assignee: Goltseva Taisiia >Priority: Critical > Labels: pull-request-available > Fix For: 2.4.2, 2.7.0, 2.5.2, 2.6.1 > > > Hi, guys! > We faced the following deadlock: > > {code:java} > KafkaBasedLog Work Thread - _streaming_service_config > priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId > (decimal):2384 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586) > - waiting to lock <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707) > - locked <0xd8c3be40> (a java.lang.Object) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481) > at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264) > at > org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71) > at > org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337) > CustomDistributedHerder-connect-1 > priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId > (decimal):2362 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285) > - waiting to lock <0xd8c3be40> (a java.lang.Object) > at > com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514) > - locked <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402) > at > com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > DistributedHerder went to updateConfigsWithIncrementalCooperative() > synchronized method and called configBackingStore.snapshot() which take a > lock on internal object in KafkaConfigBackingStore class. > > Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized > block on internal object got SESSION_KEY record and called > updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder. > > As I can see the problem is here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737] > > As I understand this call should be performed outside synchronized block: > {code:java} > if (started) > > updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code} > > I'm going to make a PR. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218213#comment-17218213 ] Mayur Patki edited comment on KAFKA-4669 at 10/22/20, 5:08 AM: --- Hi Team, [~rsivaram] , Encountering this issue on consumers as well - is there a workaround for this? spring-kafka consumer used spring-kafka version - 2.2.14.RELEASE kafka client version - kafka-client - 2.0.1 kafka cluster running on 1.1.1 Consumer exception - cause: {} - java.lang.IllegalStateException: Correlation id for response (400801) does not match request (400737), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=3, clientId=consumer-7, correlationId=400737) at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:853) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:638) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:757) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:519) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1247) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:742) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) was (Author: mayur_patki): Hi Team, Encountering this issue on consumers as well - is there a workaround for this? spring-kafka consumer used spring-kafka version - 2.2.14.RELEASE kafka client version - kafka-client - 2.0.1 kafka cluster running on 1.1.1 Consumer exception - cause: {} - java.lang.IllegalStateException: Correlation id for response (400801) does not match request (400737), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=3, clientId=consumer-7, correlationId=400737) at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:853) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:638) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:757) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:519) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1247) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:742) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ER
[GitHub] [kafka] vvcephei commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
vvcephei commented on pull request #9477: URL: https://github.com/apache/kafka/pull/9477#issuecomment-714211840 Just fixed the build; re-running the tests now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
vvcephei commented on pull request #9477: URL: https://github.com/apache/kafka/pull/9477#issuecomment-714211742 Thanks, @mjsax ! I agree with creating a ticket for the follow-on work; I'll do that tomorrow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
vvcephei commented on a change in pull request #9477: URL: https://github.com/apache/kafka/pull/9477#discussion_r509869342 ## File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java ## @@ -73,7 +73,6 @@ public void setup() { // setup test driver final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation"); Review comment: Yeah, you're looking at a state where I had partially applied the suggestion to drop the unnecessary configs. Bootstrap Server is always unnecessary with TTD, but applicationId sometimes winds up being necessary to keep tests from colliding on the state directory. Chasing these down is what got me hung up the first time around, so I think I'll just leave it as-is for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10603) Re-design KStream.process() and K*.transform*() operations
[ https://issues.apache.org/jira/browse/KAFKA-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218723#comment-17218723 ] Sagar Rao commented on KAFKA-10603: --- Hey [~vvcephei], I know this task requires a understanding of the new changes in the KIP.. Just curious, is it something that I can pick up? > Re-design KStream.process() and K*.transform*() operations > -- > > Key: KAFKA-10603 > URL: https://issues.apache.org/jira/browse/KAFKA-10603 > Project: Kafka > Issue Type: New Feature >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > After the implementation of KIP-478, we have the ability to reconsider all > these APIs, and maybe just replace them with > {code:java} > // KStream > KStream process(ProcessorSupplier) > // KTable > KTable process(ProcessorSupplier){code} > > but it needs more thought and a KIP for sure. > > This ticket probably supercedes > https://issues.apache.org/jira/browse/KAFKA-8396 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r509857105 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception { public void testFollowerGracefulShutdown() throws Exception { int otherNodeId = 1; int epoch = 5; +Set voters = Utils.mkSet(LOCAL_ID, otherNodeId); -Set voters = Utils.mkSet(localId, otherNodeId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters)); -KafkaRaftClient client = buildClient(voters); -assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState()); +RaftClientTestContext context = new RaftClientTestContext.Builder() +.updateQuorumStateStore(quorumStateStore -> { +assertDoesNotThrow(() -> { + quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters)); +}); +}) +.build(voters); -client.poll(); +assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState()); + +context.client.poll(); int shutdownTimeoutMs = 5000; -CompletableFuture shutdownFuture = client.shutdown(shutdownTimeoutMs); -assertTrue(client.isRunning()); +CompletableFuture shutdownFuture = context.client.shutdown(shutdownTimeoutMs); +assertTrue(context.client.isRunning()); assertFalse(shutdownFuture.isDone()); -client.poll(); -assertFalse(client.isRunning()); +context.client.poll(); +assertFalse(context.client.isRunning()); assertTrue(shutdownFuture.isDone()); assertNull(shutdownFuture.get()); } @Test public void testGracefulShutdownSingleMemberQuorum() throws IOException { -KafkaRaftClient client = buildClient(Collections.singleton(localId)); +RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID)); + assertEquals(ElectionState.withElectedLeader( -1, localId, Collections.singleton(localId)), quorumStateStore.readElectionState()); -client.poll(); -assertEquals(0, channel.drainSendQueue().size()); +1, LOCAL_ID, Collections.singleton(LOCAL_ID)), context.quorumStateStore.readElectionState()); +context.client.poll(); +assertEquals(0, context.channel.drainSendQueue().size()); int shutdownTimeoutMs = 5000; -client.shutdown(shutdownTimeoutMs); -assertTrue(client.isRunning()); -client.poll(); -assertFalse(client.isRunning()); +context.client.shutdown(shutdownTimeoutMs); +assertTrue(context.client.isRunning()); +context.client.poll(); +assertFalse(context.client.isRunning()); } @Test public void testFollowerReplication() throws Exception { int otherNodeId = 1; int epoch = 5; -Set voters = Utils.mkSet(localId, otherNodeId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters)); -KafkaRaftClient client = buildClient(voters); -assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState()); +Set voters = Utils.mkSet(LOCAL_ID, otherNodeId); + +RaftClientTestContext context = new RaftClientTestContext.Builder() +.updateQuorumStateStore(quorumStateStore -> { +assertDoesNotThrow(() -> { + quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters)); +}); +}) +.build(voters); + +assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState()); -pollUntilSend(client); +context.pollUntilSend(); -int fetchQuorumCorrelationId = assertSentFetchRequest(epoch, 0L, 0); +int fetchQuorumCorrelationId = context.assertSentFetchRequest(epoch, 0L, 0); Records records = MemoryRecords.withRecords(0L, CompressionType.NONE, 3, new SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())); FetchResponseData response = fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE); -deliverResponse(fetchQuorumCorrelationId, otherNodeId, response); +context.deliverResponse(fetchQuorumCorrelationId, otherNodeId, response); -client.poll(); -assertEquals(2L, log.endOffset().offset); -assertEquals(2L, log.lastFlushedOffset()); +context.client.poll(); +assertEquals(2L, context.log.endOffset
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r509857027 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception { public void testFollowerGracefulShutdown() throws Exception { int otherNodeId = 1; int epoch = 5; +Set voters = Utils.mkSet(LOCAL_ID, otherNodeId); -Set voters = Utils.mkSet(localId, otherNodeId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters)); -KafkaRaftClient client = buildClient(voters); -assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState()); +RaftClientTestContext context = new RaftClientTestContext.Builder() +.updateQuorumStateStore(quorumStateStore -> { Review comment: Thanks for the suggestion. I implemented this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
feyman2016 commented on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-714183165 @vvcephei Thanks for the help, fyi, I also tried to build locally with the apache/trunk merged, it succeeded. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8124) Beginning offset is after the ending offset for topic partition
[ https://issues.apache.org/jira/browse/KAFKA-8124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218694#comment-17218694 ] roamer_wu commented on KAFKA-8124: -- +1 spark2.4 with kafka2.1.0 > Beginning offset is after the ending offset for topic partition > --- > > Key: KAFKA-8124 > URL: https://issues.apache.org/jira/browse/KAFKA-8124 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.0 > Environment: OS : Rhel 7 > server : VM >Reporter: suseendramani >Priority: Major > > > We are getting this issue in production and Sparks consumer dying because of > Off Set issue. > We observed the following error in Kafka Broker ( that has problems) > -- > [2019-03-18 14:40:14,100] WARN Unable to reconnect to ZooKeeper service, > session 0x1692e9ff4410004 has expired (org.apache.zookeeper.ClientCnxn) > [2019-03-18 14:40:14,100] INFO Unable to reconnect to ZooKeeper service, > session 0x1692e9ff4410004 has expired, closing socket connection > (org.apache.zook > eeper.ClientCnxn) > --- > Error from other broker when talking to the problematic broker. > [2019-03-18 14:40:14,107] INFO [ReplicaFetcher replicaId=3, leaderId=5, > fetcherId=0] Error sending fetch request (sessionId=2127346653, > epoch=27048427) to > node 5: java.nio.channels.ClosedSelectorException. > (org.apache.kafka.clients.FetchSessionHandler) > > > > All topics were having replication factor of 3 and this issue happens when > one of the broker was having issues. We are using SCRAM authentication > (SHA-256) and SSL. > > Sparks Job died with the following error: > ERROR 2019-03-18 07:40:57,178 7924 org.apache.spark.executor.Executor > [Executor task launch worker for task 16] Exception in task 27.0 in stage 0.0 > (TID 16) > java.lang.AssertionError: assertion failed: Beginning offset 115204574 is > after the ending offset 115204516 for topic partition 37. You > either provided an invalid fromOffset, or the Kafka topic has been damaged > at scala.Predef$.assert(Predef.scala:170) > at org.apache.spark.streaming.kafka010.KafkaRDD.compute(KafkaRDD.scala:175) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > --- > > please let me know if you need more details. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r509840834 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -90,470 +76,480 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class KafkaRaftClientTest { -private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0); - -private final int localId = 0; -private final int electionTimeoutMs = 1; -private final int electionBackoffMaxMs = 100; -private final int fetchTimeoutMs = 5; // fetch timeout is usually larger than election timeout -private final int retryBackoffMs = 50; -private final int requestTimeoutMs = 5000; -private final int fetchMaxWaitMs = 0; - -private final MockTime time = new MockTime(); -private final MockLog log = new MockLog(METADATA_PARTITION); -private final MockNetworkChannel channel = new MockNetworkChannel(); -private final Random random = Mockito.spy(new Random(1)); -private final QuorumStateStore quorumStateStore = new MockQuorumStateStore(); - -@AfterEach -public void cleanUp() throws IOException { -quorumStateStore.clear(); -} - -private InetSocketAddress mockAddress(int id) { -return new InetSocketAddress("localhost", 9990 + id); -} - -private KafkaRaftClient buildClient(Set voters) throws IOException { -return buildClient(voters, new Metrics(time)); -} - -private KafkaRaftClient buildClient(Set voters, Metrics metrics) throws IOException { -LogContext logContext = new LogContext(); -QuorumState quorum = new QuorumState(localId, voters, electionTimeoutMs, fetchTimeoutMs, -quorumStateStore, time, logContext, random); - -Map voterAddresses = voters.stream().collect(Collectors.toMap( -Function.identity(), -this::mockAddress -)); - -KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, time, metrics, -new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), voterAddresses, -electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, fetchMaxWaitMs, logContext, random); - -client.initialize(); - -return client; -} - @Test public void testInitializeSingleMemberQuorum() throws IOException { -buildClient(Collections.singleton(localId)); -assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)), -quorumStateStore.readElectionState()); +RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID)); +assertEquals( +ElectionState.withElectedLeader(1, LOCAL_ID, Collections.singleton(LOCAL_ID)), +context.quorumStateStore.readElectionState() +); } @Test public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception { // Start off as leader. We should still bump the epoch after initialization int initialEpoch = 2; -Set voters = Collections.singleton(localId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters)); - -KafkaRaftClient client = buildClient(voters); -assertEquals(1L, log.endOffset().offset); -assertEquals(initialEpoch + 1, log.lastFetchedEpoch()); -assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1), -client.currentLeaderAndEpoch()); -assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters), -quorumStateStore.readElectionState()); +Set voters = Collections.singleton(LOCAL_ID); +RaftClientTestContext context = new RaftClientTestContext.Builder() +.updateQuorumStateStore(quorumStateStore -> { +assertDoesNotThrow(() -> { +quorumStateStore.writeElectionState( +ElectionState.withElectedLeader(initialEpoch, LOCAL_ID, voters) +); +}); +}) +.build(voters); + +assertEquals(1L, context.log.endOffset().offset); +assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch()); +assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch + 1), +context.client.currentLeaderAndEpoch()); +assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, LOCAL_ID, voters), +context.quorumStateStore.readElectionState()); } @Test public void testInitializeAsLeaderFromStateStore() throws Exception { -Set voters = Utils.mkSet(localId, 1); +Set voters = Utils.mkSet(LOCAL_ID, 1); int epoch = 2; -Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs); - quorumStateStore.wr
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r509840265 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -90,470 +76,480 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class KafkaRaftClientTest { -private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0); - -private final int localId = 0; -private final int electionTimeoutMs = 1; -private final int electionBackoffMaxMs = 100; -private final int fetchTimeoutMs = 5; // fetch timeout is usually larger than election timeout -private final int retryBackoffMs = 50; -private final int requestTimeoutMs = 5000; -private final int fetchMaxWaitMs = 0; - -private final MockTime time = new MockTime(); -private final MockLog log = new MockLog(METADATA_PARTITION); -private final MockNetworkChannel channel = new MockNetworkChannel(); -private final Random random = Mockito.spy(new Random(1)); -private final QuorumStateStore quorumStateStore = new MockQuorumStateStore(); - -@AfterEach -public void cleanUp() throws IOException { -quorumStateStore.clear(); -} - -private InetSocketAddress mockAddress(int id) { -return new InetSocketAddress("localhost", 9990 + id); -} - -private KafkaRaftClient buildClient(Set voters) throws IOException { -return buildClient(voters, new Metrics(time)); -} - -private KafkaRaftClient buildClient(Set voters, Metrics metrics) throws IOException { -LogContext logContext = new LogContext(); -QuorumState quorum = new QuorumState(localId, voters, electionTimeoutMs, fetchTimeoutMs, -quorumStateStore, time, logContext, random); - -Map voterAddresses = voters.stream().collect(Collectors.toMap( -Function.identity(), -this::mockAddress -)); - -KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, time, metrics, -new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), voterAddresses, -electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, fetchMaxWaitMs, logContext, random); - -client.initialize(); - -return client; -} - @Test public void testInitializeSingleMemberQuorum() throws IOException { -buildClient(Collections.singleton(localId)); -assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)), -quorumStateStore.readElectionState()); +RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID)); +assertEquals( +ElectionState.withElectedLeader(1, LOCAL_ID, Collections.singleton(LOCAL_ID)), +context.quorumStateStore.readElectionState() +); } @Test public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception { // Start off as leader. We should still bump the epoch after initialization int initialEpoch = 2; -Set voters = Collections.singleton(localId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters)); - -KafkaRaftClient client = buildClient(voters); -assertEquals(1L, log.endOffset().offset); -assertEquals(initialEpoch + 1, log.lastFetchedEpoch()); -assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1), -client.currentLeaderAndEpoch()); -assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters), -quorumStateStore.readElectionState()); +Set voters = Collections.singleton(LOCAL_ID); +RaftClientTestContext context = new RaftClientTestContext.Builder() +.updateQuorumStateStore(quorumStateStore -> { +assertDoesNotThrow(() -> { +quorumStateStore.writeElectionState( +ElectionState.withElectedLeader(initialEpoch, LOCAL_ID, voters) +); +}); +}) +.build(voters); + +assertEquals(1L, context.log.endOffset().offset); +assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch()); +assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch + 1), +context.client.currentLeaderAndEpoch()); +assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, LOCAL_ID, voters), +context.quorumStateStore.readElectionState()); } @Test public void testInitializeAsLeaderFromStateStore() throws Exception { -Set voters = Utils.mkSet(localId, 1); +Set voters = Utils.mkSet(LOCAL_ID, 1); int epoch = 2; -Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs); - quorumStateStore.wr
[jira] [Assigned] (KAFKA-10053) Update Document for new feature that Allow HTTP Response Headers to be Configured for Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-10053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Hamill reassigned KAFKA-10053: --- Assignee: Joel Hamill > Update Document for new feature that Allow HTTP Response Headers to be > Configured for Kafka Connect > --- > > Key: KAFKA-10053 > URL: https://issues.apache.org/jira/browse/KAFKA-10053 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jeff Huang >Assignee: Joel Hamill >Priority: Major > > We need update AK document for this new feature whose details is in following > KIP: here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10053) Update Document for new feature that Allow HTTP Response Headers to be Configured for Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-10053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Hamill reassigned KAFKA-10053: --- Assignee: (was: Joel Hamill) > Update Document for new feature that Allow HTTP Response Headers to be > Configured for Kafka Connect > --- > > Key: KAFKA-10053 > URL: https://issues.apache.org/jira/browse/KAFKA-10053 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jeff Huang >Priority: Major > > We need update AK document for this new feature whose details is in following > KIP: here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r509787211 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -1536,67 +1522,70 @@ public void testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception int otherNodeId = 2; int epoch = 5; Set voters = Utils.mkSet(leaderId, otherNodeId); -KafkaRaftClient client = buildClient(voters); -discoverLeaderAsObserver(client, voters, leaderId, epoch); -pollUntilSend(client); -RaftRequest.Outbound fetchRequest1 = assertSentFetchRequest(); +RaftClientTestContext context = RaftClientTestContext.build(voters); + +context.discoverLeaderAsObserver(voters, leaderId, epoch); + +context.pollUntilSend(); +RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); assertEquals(leaderId, fetchRequest1.destinationId()); -assertFetchRequestData(fetchRequest1, epoch, 0L, 0); +RaftClientTestContext.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); -time.sleep(requestTimeoutMs); -pollUntilSend(client); +context.time.sleep(REQUEST_TIMEOUT_MS); +context.pollUntilSend(); // We should retry the Fetch against the other voter since the original // voter connection will be backing off. -RaftRequest.Outbound fetchRequest2 = assertSentFetchRequest(); +RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); assertNotEquals(leaderId, fetchRequest2.destinationId()); assertTrue(voters.contains(fetchRequest2.destinationId())); -assertFetchRequestData(fetchRequest2, epoch, 0L, 0); +RaftClientTestContext.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); -deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(), +context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(), fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH)); -client.poll(); +context.client.poll(); -assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState()); +assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), context.quorumStateStore.readElectionState()); } @Test public void testLeaderGracefulShutdown() throws Exception { int otherNodeId = 1; -Set voters = Utils.mkSet(localId, otherNodeId); int epoch = 1; -KafkaRaftClient client = initializeAsLeader(voters, epoch); +Set voters = Utils.mkSet(LOCAL_ID, otherNodeId); + +RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(voters, epoch); // Now shutdown int shutdownTimeoutMs = 5000; -CompletableFuture shutdownFuture = client.shutdown(shutdownTimeoutMs); +CompletableFuture shutdownFuture = context.client.shutdown(shutdownTimeoutMs); // We should still be running until we have had a chance to send EndQuorumEpoch -assertTrue(client.isShuttingDown()); -assertTrue(client.isRunning()); +assertTrue(context.client.isShuttingDown()); Review comment: Yeah. Let me play around with this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
mjsax commented on pull request #9477: URL: https://github.com/apache/kafka/pull/9477#issuecomment-713939417 Checkstyle failed: ``` [ant:checkstyle] [ERROR] /home/jenkins/workspace/Kafka_kafka-pr_PR-9477@2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java:44:8: Unused import - java.util.Properties. [UnusedImports] ``` No objections to merge as-is -- for the KIP, it's too late anyway for 2.7. Let us just create a ticket -- seems like a new "newbie" task to add the new constructor and finish the cleanup of our tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
mjsax commented on a change in pull request #9477: URL: https://github.com/apache/kafka/pull/9477#discussion_r509783380 ## File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java ## @@ -340,7 +340,6 @@ public void shouldThrowOnUnassignedStateStoreAccess() { final String badNodeName = "badGuy"; final Properties config = new Properties(); -config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); Review comment: can be removed? (seems on many other tests, too) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
mjsax commented on a change in pull request #9477: URL: https://github.com/apache/kafka/pull/9477#discussion_r509783306 ## File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java ## @@ -73,7 +73,6 @@ public void setup() { // setup test driver final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "maxAggregation"); Review comment: can be removed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
mjsax commented on a change in pull request #9477: URL: https://github.com/apache/kafka/pull/9477#discussion_r509783380 ## File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java ## @@ -340,7 +340,6 @@ public void shouldThrowOnUnassignedStateStoreAccess() { final String badNodeName = "badGuy"; final Properties config = new Properties(); -config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); Review comment: can be removed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
hachikuji commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r509760723 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception { public void testFollowerGracefulShutdown() throws Exception { int otherNodeId = 1; int epoch = 5; +Set voters = Utils.mkSet(LOCAL_ID, otherNodeId); -Set voters = Utils.mkSet(localId, otherNodeId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters)); -KafkaRaftClient client = buildClient(voters); -assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState()); +RaftClientTestContext context = new RaftClientTestContext.Builder() +.updateQuorumStateStore(quorumStateStore -> { Review comment: I think nearly every call to `updateQuorumStateStore` is just writing an initial state. Seems like we can introduce a more direct option to the builder. By the way, one of the annoyances is needing to provide `voters` through the initial state and through `build` below. Since we always need `voters`, maybe we can provide it in the builder constructor. That would allow us to add helpers to construct the state. For example, we could turn this into: ```java new RaftClientTestContext.Builder(voters) .initializeAsFollower(epoch, otherNodeId) .build() ``` Similarly, we could probably do state assertions in the test context as well and save the need to always pass through `voters` (e.g. we could have `context.assertFollower(epoch, leaderId)` instead of the cumbersome `assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState())`). ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception { public void testFollowerGracefulShutdown() throws Exception { int otherNodeId = 1; int epoch = 5; +Set voters = Utils.mkSet(LOCAL_ID, otherNodeId); -Set voters = Utils.mkSet(localId, otherNodeId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters)); -KafkaRaftClient client = buildClient(voters); -assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState()); +RaftClientTestContext context = new RaftClientTestContext.Builder() +.updateQuorumStateStore(quorumStateStore -> { Review comment: I think just about every call to `updateQuorumStateStore` is just writing an initial state. Seems like we can introduce a more direct option to the builder. By the way, one of the annoyances is needing to provide `voters` through the initial state and through `build` below. Since we always need `voters`, maybe we can provide it in the builder constructor. That would allow us to add helpers to construct the state. For example, we could turn this into: ```java new RaftClientTestContext.Builder(voters) .initializeAsFollower(epoch, otherNodeId) .build() ``` ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -1729,126 +1695,156 @@ public void testLeaderGracefulShutdownTimeout() throws Exception { public void testFollowerGracefulShutdown() throws Exception { int otherNodeId = 1; int epoch = 5; +Set voters = Utils.mkSet(LOCAL_ID, otherNodeId); -Set voters = Utils.mkSet(localId, otherNodeId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters)); -KafkaRaftClient client = buildClient(voters); -assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), quorumStateStore.readElectionState()); +RaftClientTestContext context = new RaftClientTestContext.Builder() +.updateQuorumStateStore(quorumStateStore -> { +assertDoesNotThrow(() -> { + quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, otherNodeId, voters)); +}); +}) +.build(voters); -client.poll(); +assertEquals(ElectionState.withElectedLeader(epoch, otherNodeId, voters), context.quorumStateStore.readElectionState()); + +context.client.poll(); int shutdownTimeoutMs = 5000; -CompletableFuture shutdownFuture = client.shutdown(shutdownTimeoutMs); -assertTrue(client.isRunning()); +CompletableFuture shutdownFuture = context.client.shutdown(shutdownTimeoutMs); +assertTrue(context.client.isRunning());
[GitHub] [kafka] vvcephei commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
vvcephei commented on pull request #9477: URL: https://github.com/apache/kafka/pull/9477#issuecomment-713906551 Hey @mjsax and @abbccdda , Sorry for this, but I've had to open a new PR instead of just updating https://github.com/apache/kafka/pull/9052 in place. I just took the current state of that other PR and rebased it on trunk (and resolved the conflicts). I'd like to get this quality-of-life improvement into 2.7, but I didn't have time to take care of your suggestions for follow-on work. For clarity, I have migrated some, but not all, internal usages of TopologyTestDriver not to need the appId/bootstrap. I also didn't do a follow-on kip to offer new constructors that don't take `Properties` at all. Are you still ok with merging this as-is? (cc @bbejeck ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei closed pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters
vvcephei closed pull request #9052: URL: https://github.com/apache/kafka/pull/9052 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters
vvcephei commented on pull request #9052: URL: https://github.com/apache/kafka/pull/9052#issuecomment-713905061 Closing in favor of https://github.com/apache/kafka/pull/9477, since I've recycled my fork since opening this PR and can no longer update the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
vvcephei opened a new pull request #9477: URL: https://github.com/apache/kafka/pull/9477 TopologyTestDriver comes with a paper cut that it passes through a config requirement that application.id and bootstrap.servers must be configured. But these configs are not required in the context of TopologyTestDriver specifically. This change relaxes the requirement. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509740150 ## File path: core/src/main/scala/kafka/common/RecordValidationException.scala ## @@ -23,5 +23,6 @@ import org.apache.kafka.common.requests.ProduceResponse.RecordError import scala.collection.Seq class RecordValidationException(val invalidException: ApiException, -val recordErrors: Seq[RecordError]) extends RuntimeException { +val recordErrors: Seq[RecordError]) + extends RuntimeException(invalidException) { Review comment: Yeah, doesn't need to be done here. I just noticed the trace was missing when debugging a failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio opened a new pull request #9476: URL: https://github.com/apache/kafka/pull/9476 There is a lot of functionality in KafkaRaftClientTest that is useful for writing other tests. Refactor that functionality into another class that can be reused in other tests. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509732236 ## File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala ## @@ -216,11 +216,9 @@ class KafkaNetworkChannel(time: Time, endpoints.put(id, node) } - def postInboundRequest(header: RequestHeader, - request: AbstractRequest, - onResponseReceived: ResponseHandler): Unit = { + def postInboundRequest(request: AbstractRequest, onResponseReceived: ResponseHandler): Unit = { Review comment: This was actually a bug. The implementation was treating `correlationId` as unique across all connections, which of course was wrong. My fix was just to overwrite the `correlationId` from the header with one that could be unique, but obviously this loses traceability through the Raft layer. If it's ok with you, I'd like to address this problem more generally in a follow-up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509726250 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.protocol.DataOutputStreamWritable; +import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.DefaultRecord; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.raft.RecordSerde; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class BatchBuilder { Review comment: I am not sure I follow. Are you suggesting making `BatchBuilder` a nested class? I think what it's doing is complex enough that I wanted a separate class that could be tested in isolation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509723127 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.raft.RecordSerde; + +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +/** + * TODO: Also flush after minimum size limit is reached? + */ +public class BatchAccumulator implements Closeable { +private final int epoch; +private final Time time; +private final Timer lingerTimer; +private final int lingerMs; +private final int maxBatchSize; +private final CompressionType compressionType; +private final MemoryPool memoryPool; +private final ReentrantLock lock; +private final RecordSerde serde; + +private long nextOffset; +private BatchBuilder currentBatch; +private List> completed; + +public BatchAccumulator( +int epoch, +long baseOffset, +int lingerMs, +int maxBatchSize, +MemoryPool memoryPool, +Time time, +CompressionType compressionType, +RecordSerde serde +) { +this.epoch = epoch; +this.lingerMs = lingerMs; +this.maxBatchSize = maxBatchSize; +this.memoryPool = memoryPool; +this.time = time; +this.lingerTimer = time.timer(lingerMs); +this.compressionType = compressionType; +this.serde = serde; +this.nextOffset = baseOffset; +this.completed = new ArrayList<>(); +this.lock = new ReentrantLock(); +} + +/** + * Append a list of records into an atomic batch. We guarantee all records + * are included in the same underlying record batch so that either all of + * the records become committed or none of them do. + * + * @param epoch the expected leader epoch + * @param records the list of records to include in a batch + * @return the offset of the last message or {@link Long#MAX_VALUE} if the epoch + * does not match + */ +public Long append(int epoch, List records) { +if (epoch != this.epoch) { +// If the epoch does not match, then the state machine probably +// has not gotten the notification about the latest epoch change. +// In this case, ignore the append and return a large offset value +// which will never be committed. +return Long.MAX_VALUE; +} + +Object serdeContext = serde.newWriteContext(); +int batchSize = 0; +for (T record : records) { +batchSize += serde.recordSize(record, serdeContext); +} + +if (batchSize > maxBatchSize) { +throw new IllegalArgumentException("The total size of " + records + " is " + batchSize + +", which exceeds the maximum allowed batch size of " + maxBatchSize); +} + +lock.lock(); +try { +BatchBuilder batch = maybeAllocateBatch(batchSize); +if (batch == null) { +return null; +} + +if (isEmpty()) { +lingerTimer.update(); +lingerTimer.reset(lingerMs); +} + +for (T record : records) { +batch.appendRecord(record, serdeContext); +nextOffset += 1; +} + +return nextOffset - 1; +} finally { +lock.unlock(); +} +} + +private BatchBuilder maybeAllocateBatch(int batchSize) { +if (currentBatch == null) { +startNewBatch(); +} else if (!currentBatch.hasRoomFor(batchSize)) { +completeCurrent
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509719785 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1605,100 +1708,18 @@ public void poll() throws IOException { } } -private void failPendingAppends(KafkaException exception) { -for (UnwrittenAppend unwrittenAppend : unwrittenAppends) { -unwrittenAppend.fail(exception); -} -unwrittenAppends.clear(); -} - -private void pollPendingAppends(LeaderState state, long currentTimeMs) { -int numAppends = 0; -int maxNumAppends = unwrittenAppends.size(); - -while (!unwrittenAppends.isEmpty() && numAppends < maxNumAppends) { -final UnwrittenAppend unwrittenAppend = unwrittenAppends.poll(); - -if (unwrittenAppend.future.isDone()) -continue; - -if (unwrittenAppend.isTimedOut(currentTimeMs)) { -unwrittenAppend.fail(new TimeoutException("Request timeout " + unwrittenAppend.requestTimeoutMs -+ " expired before the records could be appended to the log")); -} else { -int epoch = quorum.epoch(); -LogAppendInfo info = appendAsLeader(unwrittenAppend.records); -OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch); -long numRecords = info.lastOffset - info.firstOffset + 1; -logger.debug("Completed write of {} records at {}", numRecords, offsetAndEpoch); - -if (unwrittenAppend.ackMode == AckMode.LEADER) { -unwrittenAppend.complete(offsetAndEpoch); -} else if (unwrittenAppend.ackMode == AckMode.QUORUM) { -CompletableFuture future = appendPurgatory.await( -LogOffset.awaitCommitted(offsetAndEpoch.offset), -unwrittenAppend.requestTimeoutMs); - -future.whenComplete((completionTimeMs, exception) -> { -if (exception != null) { -logger.error("Failed to commit append at {} due to {}", offsetAndEpoch, exception); - -unwrittenAppend.fail(exception); -} else { -long elapsedTime = Math.max(0, completionTimeMs - currentTimeMs); -double elapsedTimePerRecord = (double) elapsedTime / numRecords; - kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, currentTimeMs); -unwrittenAppend.complete(offsetAndEpoch); - -logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch); -} -}); -} -} - -numAppends++; -} - -if (numAppends > 0) { -flushLeaderLog(state, currentTimeMs); -} -} - -/** - * Append a set of records to the log. Successful completion of the future indicates a success of - * the append, with the uncommitted base offset and epoch. - * - * @param records The records to write to the log - * @param ackMode The commit mode for the appended records - * @param timeoutMs The maximum time to wait for the append operation to complete (including - * any time needed for replication) - * @return The uncommitted base offset and epoch of the appended records - */ @Override -public CompletableFuture append( -Records records, -AckMode ackMode, -long timeoutMs -) { -if (records.sizeInBytes() == 0) -throw new IllegalArgumentException("Attempt to append empty record set"); - -if (shutdown.get() != null) -throw new IllegalStateException("Cannot append records while we are shutting down"); - -if (quorum.isObserver()) -throw new IllegalStateException("Illegal attempt to write to an observer"); - -CompletableFuture future = new CompletableFuture<>(); -UnwrittenAppend unwrittenAppend = new UnwrittenAppend( -records, time.milliseconds(), timeoutMs, ackMode, future); +public Long scheduleAppend(int epoch, List records) { +BatchAccumulator accumulator = this.accumulator; +if (accumulator == null) { +return Long.MAX_VALUE; Review comment: Yeah, see my comment above about the handling of `Long.MAX_VALUE`. This is an attempt to reduce the error handling in the state machine. The model that we are working toward here is the following: 1) the state machine gets notified that the node has become leader in some epoch 2) the state machine can schedule appends with this epoch and it will get back the expected append offset 3) the state machine treats
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509714949 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1443,15 +1485,79 @@ private void pollShutdown(GracefulShutdown shutdown) throws IOException { } } +private void appendBatch( +LeaderState state, +BatchAccumulator.CompletedBatch batch, +long appendTimeMs +) { +try { +List records = batch.records; +int epoch = state.epoch(); + +LogAppendInfo info = appendAsLeader(batch.data); +OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch); +CompletableFuture future = appendPurgatory.await( +LogOffset.awaitCommitted(offsetAndEpoch.offset), +Integer.MAX_VALUE +); + +future.whenComplete((commitTimeMs, exception) -> { +int numRecords = batch.records.size(); +if (exception != null) { +logger.debug("Failed to commit {} records at {}", numRecords, offsetAndEpoch, exception); +} else { +long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs); +double elapsedTimePerRecord = (double) elapsedTime / numRecords; +kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs); +logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch); +listener.handleCommit(epoch, info.lastOffset, records); +} +}); +} finally { +batch.release(); +} +} + +private long maybeAppendBatches( +LeaderState state, +long currentTimeMs +) { +long timeUnitFlush = accumulator.timeUntilFlush(currentTimeMs); +if (timeUnitFlush <= 0) { +List> batches = accumulator.flush(); +Iterator> iterator = batches.iterator(); + +try { +while (iterator.hasNext()) { +BatchAccumulator.CompletedBatch batch = iterator.next(); +appendBatch(state, batch, currentTimeMs); +} +flushLeaderLog(state, currentTimeMs); Review comment: Yes, I agree with you. Of course it is ok if unflushed data gets replicated. The main thing we need to protect is incrementing the high watermark. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509711302 ## File path: core/src/main/scala/kafka/tools/TestRaftServer.scala ## @@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends Logging { ) } - class RaftIoThread(client: KafkaRaftClient) extends ShutdownableThread("raft-io-thread") { + class RaftWorkloadGenerator( +client: KafkaRaftClient[Array[Byte]], +time: Time, +brokerId: Int, +recordsPerSec: Int, +recordSize: Int + ) extends ShutdownableThread(name = "raft-workload-generator") with RaftClient.Listener[Array[Byte]] { + +private val stats = new WriteStats(time, printIntervalMs = 5000) +private val payload = new Array[Byte](recordSize) +private val pendingAppends = new util.ArrayDeque[PendingAppend]() + +private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0) +private var isLeader = false +private var throttler: ThroughputThrottler = _ +private var recordCount = 0 + +override def doWork(): Unit = { + if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) { +latestLeaderAndEpoch = client.currentLeaderAndEpoch() +isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId +if (isLeader) { + pendingAppends.clear() Review comment: You are right that the appends may still be committed, but in this patch, the `handleCommit` API is only invoked for appends within the current epoch. I thought it seemed simpler for now to just reset state at the start of the epoch. We can be more clever in the future once `handleCommit` is extended to account for replication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r509670268 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit): Unit = { +if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) { + sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception()) +} else if (request.envelopeContext.isDefined && + (!request.context.fromPrivilegedListener || + !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) +) { + // If the designated forwarding request is not coming from a privileged listener, or + // it fails CLUSTER_ACTION permission, we would fail the authorization. + sendErrorResponseMaybeThrottle(request, Errors.BROKER_AUTHORIZATION_FAILURE.exception()) +} else if (request.envelopeContext.isDefined && !controller.isActive) { + sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception()) +} else if (!controller.isActive && couldDoRedirection(request)) { + redirectionManager.forwardRequest(sendResponseMaybeThrottle, request) +} else { + // When IBP is smaller than 2.8 or the principal serde is undefined, forwarding is not supported, + // therefore requests are handled directly. + handler(request) +} + } + + private def couldDoRedirection(request: RequestChannel.Request): Boolean = Review comment: Sounds good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r509645115 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int, val context = new RequestContext(header, connectionId, channel.socketAddress, channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation, isPrivilegedListener) -val req = new RequestChannel.Request(processor = id, context = context, - startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) + +val principalSerde = Option(channel.principalSerde.orElse(null)) Review comment: Had a try but it seems java Optional doesn't have an `asScala` option This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan removed a comment on pull request #9472: MINOR: Add Jenkinsfile to 2.3
jolshan removed a comment on pull request #9472: URL: https://github.com/apache/kafka/pull/9472#issuecomment-713842825 Woo hoo! Tests all pass. LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9472: MINOR: Add Jenkinsfile to 2.3
jolshan commented on pull request #9472: URL: https://github.com/apache/kafka/pull/9472#issuecomment-713842825 Woo hoo! Tests all pass. LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509636357 ## File path: core/src/main/scala/kafka/tools/TestRaftServer.scala ## @@ -19,37 +19,44 @@ package kafka.tools import java.io.File import java.nio.file.Files -import java.util.concurrent.CountDownLatch -import java.util.{Properties, Random} +import java.util +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.{Collections, OptionalInt, Random} -import joptsimple.OptionParser +import com.yammer.metrics.core.MetricName +import joptsimple.OptionException import kafka.log.{Log, LogConfig, LogManager} import kafka.network.SocketServer import kafka.raft.{KafkaFuturePurgatory, KafkaMetadataLog, KafkaNetworkChannel} import kafka.security.CredentialProvider import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaRequestHandlerPool, KafkaServer, LogDirFailureChannel} +import kafka.tools.TestRaftServer.{ByteArraySerde, PendingAppend, ThroughputThrottler, WriteStats} Review comment: Moved the import inside `TestRaftServer`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509633527 ## File path: core/src/main/scala/kafka/tools/TestRaftServer.scala ## @@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends Logging { ) } - class RaftIoThread(client: KafkaRaftClient) extends ShutdownableThread("raft-io-thread") { + class RaftWorkloadGenerator( +client: KafkaRaftClient[Array[Byte]], +time: Time, +brokerId: Int, +recordsPerSec: Int, +recordSize: Int + ) extends ShutdownableThread(name = "raft-workload-generator") with RaftClient.Listener[Array[Byte]] { + +private val stats = new WriteStats(time, printIntervalMs = 5000) +private val payload = new Array[Byte](recordSize) +private val pendingAppends = new util.ArrayDeque[PendingAppend]() + +private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0) +private var isLeader = false +private var throttler: ThroughputThrottler = _ +private var recordCount = 0 + +override def doWork(): Unit = { + if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) { +latestLeaderAndEpoch = client.currentLeaderAndEpoch() +isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId +if (isLeader) { + pendingAppends.clear() + throttler = new ThroughputThrottler(time, recordsPerSec) + recordCount = 0 +} + } + + if (isLeader) { +recordCount += 1 + +val startTimeMs = time.milliseconds() +val sendTimeMs = if (throttler.maybeThrottle(recordCount, startTimeMs)) { + time.milliseconds() +} else { + startTimeMs +} + +val offset = client.scheduleAppend(latestLeaderAndEpoch.epoch, Collections.singletonList(payload)) +if (offset == null || offset == Long.MaxValue) { + time.sleep(10) +} else { + pendingAppends.offer(PendingAppend(latestLeaderAndEpoch.epoch, offset, sendTimeMs)) +} + } else { +time.sleep(500) + } +} + +override def handleCommit(epoch: Int, lastOffset: Long, records: util.List[Array[Byte]]): Unit = { + var offset = lastOffset - records.size() + 1 + val currentTimeMs = time.milliseconds() + + for (record <- records.asScala) { +val pendingAppend = pendingAppends.poll() +if (pendingAppend.epoch != epoch || pendingAppend.offset!= offset) { + warn(s"Expected next commit at offset ${pendingAppend.offset}, " + Review comment: That's true. In a follow-up, the `handleCommit` API will be expanded a bit to cover appends through replication as well, but for now, I think we can raise an error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509620795 ## File path: core/src/main/scala/kafka/tools/TestRaftServer.scala ## @@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends Logging { ) } - class RaftIoThread(client: KafkaRaftClient) extends ShutdownableThread("raft-io-thread") { + class RaftWorkloadGenerator( +client: KafkaRaftClient[Array[Byte]], +time: Time, +brokerId: Int, +recordsPerSec: Int, +recordSize: Int + ) extends ShutdownableThread(name = "raft-workload-generator") with RaftClient.Listener[Array[Byte]] { + +private val stats = new WriteStats(time, printIntervalMs = 5000) +private val payload = new Array[Byte](recordSize) +private val pendingAppends = new util.ArrayDeque[PendingAppend]() + +private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0) +private var isLeader = false +private var throttler: ThroughputThrottler = _ +private var recordCount = 0 + +override def doWork(): Unit = { + if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) { +latestLeaderAndEpoch = client.currentLeaderAndEpoch() +isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId +if (isLeader) { + pendingAppends.clear() + throttler = new ThroughputThrottler(time, recordsPerSec) + recordCount = 0 +} + } + + if (isLeader) { +recordCount += 1 + +val startTimeMs = time.milliseconds() +val sendTimeMs = if (throttler.maybeThrottle(recordCount, startTimeMs)) { + time.milliseconds() +} else { + startTimeMs +} + +val offset = client.scheduleAppend(latestLeaderAndEpoch.epoch, Collections.singletonList(payload)) +if (offset == null || offset == Long.MaxValue) { Review comment: I will try to document this better, but `Long.MaxValue` is how we decided to handle the case where the epoch in `scheduleAppend` does not match the current epoch. This can happen because the raft epoch is updated asynchronously and there is no way to ensure the state machine has seen the latest value. The expectation is that the state machine will update its uncommitted state with an offset which is impossible to become committed. After it observes the epoch change, this uncommitted state will be discarded. Note that although I added the explicit check here, it is not technically necessary. Let me consider removing it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509615681 ## File path: core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala ## @@ -56,73 +47,8 @@ class TestRaftRequestHandler( | ApiKeys.END_QUORUM_EPOCH | ApiKeys.FETCH => val requestBody = request.body[AbstractRequest] - networkChannel.postInboundRequest( -request.header, -requestBody, -response => sendResponse(request, Some(response))) - -case ApiKeys.API_VERSIONS => Review comment: I felt it was too difficult to approximate a controller workload using one or more producers because of the fact that we can only handle one request at a time. So I created a separate workload generator which executes on the leader and I removed all of this somewhat hacky handling logic which allowed us to use a producer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
hachikuji commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r509603110 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/DataOutputWritable.java ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.Utils; + +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class DataOutputWritable implements Writable { Review comment: Fair enough. We have no current need for `DataOutputWritable`, so I will remove this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9406: KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1
hachikuji merged pull request #9406: URL: https://github.com/apache/kafka/pull/9406 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9406: KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1
hachikuji commented on pull request #9406: URL: https://github.com/apache/kafka/pull/9406#issuecomment-713807844 Merging to trunk and 2.7. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10625) Add Union to Connect Schema
Ryan created KAFKA-10625: Summary: Add Union to Connect Schema Key: KAFKA-10625 URL: https://issues.apache.org/jira/browse/KAFKA-10625 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.6.0 Reporter: Ryan There currently is no Union type for the Kafka Connect Schema/SchemaBuilder. When using Kafka Connect to produce messages intended to be converted to AVRO a converter specific workaround must be employed to generate a union. https://stackoverflow.com/questions/64468907/does-kafka-connect-support-unions -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10139) [Easy] Add operational guide for failure recovery
[ https://issues.apache.org/jira/browse/KAFKA-10139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10139: - Summary: [Easy] Add operational guide for failure recovery (was: Add operational guide for failure recovery) > [Easy] Add operational guide for failure recovery > - > > Key: KAFKA-10139 > URL: https://issues.apache.org/jira/browse/KAFKA-10139 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Boyang Chen >Priority: Major > > In the first released version, we should include an operation manual to the > feature versioning failure cases, such as: > 1. broker crash due to violation of feature versioning > 2. ZK data corruption (rare) > We need to ensure this work gets reflected in the AK documentation after the > implementation and thorough testings are done. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10622) [Medium] Implement support for feature version deprecation
[ https://issues.apache.org/jira/browse/KAFKA-10622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10622: - Summary: [Medium] Implement support for feature version deprecation (was: Implement support for feature version deprecation) > [Medium] Implement support for feature version deprecation > -- > > Key: KAFKA-10622 > URL: https://issues.apache.org/jira/browse/KAFKA-10622 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > This Jira tracks the implementation of feature version deprecation support > for KIP-584. > The feature version deprecation is future work > ([link|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Featureversiondeprecation]). > We didn’t find a need to implement it immediately as part of AK 2.7 release > for KIP-584. The reason is that we don’t have features defined yet as part of > AK 2.7 release and it’ll be a long time (years) before we start to deprecate > feature versions. So there is no immediate need to implement the support. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10621) [Easy] Implement advanced CLI tool for feature versioning system
[ https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10621: - Summary: [Easy] Implement advanced CLI tool for feature versioning system (was: Implement advanced CLI tool for feature versioning system) > [Easy] Implement advanced CLI tool for feature versioning system > > > Key: KAFKA-10621 > URL: https://issues.apache.org/jira/browse/KAFKA-10621 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > Implement advanced CLI tool capabilities for the feature versioning system > providing the facilities as explained in [this > section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] > of KIP-584. The implementation needs to be done in > [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] > class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10624) [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration
[ https://issues.apache.org/jira/browse/KAFKA-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10624: - Summary: [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration (was: FeatureZNodeStatus should use sealed trait instead of Enumeration) > [Easy] FeatureZNodeStatus should use sealed trait instead of Enumeration > > > Key: KAFKA-10624 > URL: https://issues.apache.org/jira/browse/KAFKA-10624 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > In Scala, we prefer sealed traits over Enumeration since the former gives you > exhaustiveness checking. With Scala Enumeration, you don't get a warning if > you add a new value that is not handled in a given pattern match. > This Jira tracks refactoring enum > [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801] > from an enum to a sealed trait. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10623) [Easy] Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange
[ https://issues.apache.org/jira/browse/KAFKA-10623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10623: - Summary: [Easy] Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange (was: Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange) > [Easy] Refactor code to avoid discovery conflicts for > classes:{Supported|Finalized}VersionRange > --- > > Key: KAFKA-10623 > URL: https://issues.apache.org/jira/browse/KAFKA-10623 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > This Jira suggests changing few existing class names to avoid class discovery > conflicts. Particularly the following classes: > {code:java} > org.apache.kafka.clients.admin.{Supported|Finalized}VersionRange{code} > conflict with > > > {code:java} > org.apache.kafka.common.feature.{Supported|Finalized}VersionRange{code} > The former is internal facing, while the latter is external facing (since it > is used in the Admin#describeFeatures API). So, the internal facing classes > can be renamed suitably. Possible alternative naming suggestions: > > > {code:java} > org.apache.kafka.clients.admin.{Supported|Finalized}Versions > {code} > {code:java} > org.apache.kafka.clients.admin.Broker{Supported|Finalized}Versions > {code} > {code:java} > org.apache.kafka.clients.admin.Broker{Supported|Finalized}VersionRange{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10624) FeatureZNodeStatus should use sealed trait instead of Enumeration
Kowshik Prakasam created KAFKA-10624: Summary: FeatureZNodeStatus should use sealed trait instead of Enumeration Key: KAFKA-10624 URL: https://issues.apache.org/jira/browse/KAFKA-10624 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam In Scala, we prefer sealed traits over Enumeration since the former gives you exhaustiveness checking. With Scala Enumeration, you don't get a warning if you add a new value that is not handled in a given pattern match. This Jira tracks refactoring enum [FeatureZNodeStatus|https://github.com/apache/kafka/blob/fb4f297207ef62f71e4a6d2d0dac75752933043d/core/src/main/scala/kafka/zk/ZkData.scala#L801] from an enum to a sealed trait. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r509540617 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -94,19 +104,60 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size -//most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. -//some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference -//to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. +def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = + if (envelopeContext.isDefined) { Review comment: nit: use `match` ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int, val context = new RequestContext(header, connectionId, channel.socketAddress, channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation, isPrivilegedListener) -val req = new RequestChannel.Request(processor = id, context = context, - startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) + +val principalSerde = Option(channel.principalSerde.orElse(null)) +val req = +if (header.apiKey == ApiKeys.ENVELOPE) { Review comment: nit: this is misaligned. It might be better to pull the body here into a separate method (e.g. `parseEnvelopeRequest`) ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit): Unit = { +if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) { + sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception()) +} else if (request.envelopeContext.isDefined && + (!request.context.fromPrivilegedListener || + !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) +) { + // If the designated forwarding request is not coming from a privileged listener, or + // it fails CLUSTER_ACTION permission, we would fail the authorization. + sendErrorResponseMaybeThrottle(request, Errors.BROKER_AUTHORIZATION_FAILURE.exception()) Review comment: As mentioned above, you can see the rest of the cases in this class where we check CLUSTER_ACTION and they all return `CLUSTER_AUTHORIZATION_FAILURE`. ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -153,7 +177,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) -case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request) +case ApiKeys.CREATE_TOPICS => maybeForward(request, handleCreateTopicsRequest) Review comment: We should have a check at the beginning of `handle` to restrict the "forwardable" APIs. ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -94,19 +104,60 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size -//most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. -//some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference -//to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. +def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = Review comment: nit: add braces to all of these methods. Even though they are not required, braces make it easier to see the scope ##
[jira] [Created] (KAFKA-10623) Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange
Kowshik Prakasam created KAFKA-10623: Summary: Refactor code to avoid discovery conflicts for classes:{Supported|Finalized}VersionRange Key: KAFKA-10623 URL: https://issues.apache.org/jira/browse/KAFKA-10623 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam This Jira suggests changing few existing class names to avoid class discovery conflicts. Particularly the following classes: {code:java} org.apache.kafka.clients.admin.{Supported|Finalized}VersionRange{code} conflict with {code:java} org.apache.kafka.common.feature.{Supported|Finalized}VersionRange{code} The former is internal facing, while the latter is external facing (since it is used in the Admin#describeFeatures API). So, the internal facing classes can be renamed suitably. Possible alternative naming suggestions: {code:java} org.apache.kafka.clients.admin.{Supported|Finalized}Versions {code} {code:java} org.apache.kafka.clients.admin.Broker{Supported|Finalized}Versions {code} {code:java} org.apache.kafka.clients.admin.Broker{Supported|Finalized}VersionRange{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers
mjsax commented on pull request #9000: URL: https://github.com/apache/kafka/pull/9000#issuecomment-713781998 @soarez Sorry but you will need to rebase your PR to get https://github.com/apache/kafka/commit/2db67db8e1329cb2e047322cff81d97ff98b4328 -- otherwise, Jenkins does fail... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10622) Implement support for feature version deprecation
Kowshik Prakasam created KAFKA-10622: Summary: Implement support for feature version deprecation Key: KAFKA-10622 URL: https://issues.apache.org/jira/browse/KAFKA-10622 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam This Jira tracks the implementation of feature version deprecation support for KIP-584. The feature version deprecation is future work ([link|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Featureversiondeprecation]). We didn’t find a need to implement it immediately as part of AK 2.7 release for KIP-584. The reason is that we don’t have features defined yet as part of AK 2.7 release and it’ll be a long time (years) before we start to deprecate feature versions. So there is no immediate need to implement the support. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10621) Implement advanced CLI tool for feature versioning system
[ https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10621: - Description: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [this section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. (was: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [[this section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]|#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. ) > Implement advanced CLI tool for feature versioning system > - > > Key: KAFKA-10621 > URL: https://issues.apache.org/jira/browse/KAFKA-10621 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > Implement advanced CLI tool capabilities for the feature versioning system > providing the facilities as explained in [this > section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] > of KIP-584. The implementation needs to be done in > [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] > class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10621) Implement advanced CLI tool for feature versioning system
[ https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10621: - Description: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [this section|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. (was: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in this section of KIP-584: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] . The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. ) > Implement advanced CLI tool for feature versioning system > - > > Key: KAFKA-10621 > URL: https://issues.apache.org/jira/browse/KAFKA-10621 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > Implement advanced CLI tool capabilities for the feature versioning system > providing the facilities as explained in [this > section|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] > of KIP-584. The implementation needs to be done in > [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] > class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10621) Implement advanced CLI tool for feature versioning system
[ https://issues.apache.org/jira/browse/KAFKA-10621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kowshik Prakasam updated KAFKA-10621: - Description: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [[this section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]|#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. (was: Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in [this section|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] of KIP-584. The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. ) > Implement advanced CLI tool for feature versioning system > - > > Key: KAFKA-10621 > URL: https://issues.apache.org/jira/browse/KAFKA-10621 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Priority: Minor > > Implement advanced CLI tool capabilities for the feature versioning system > providing the facilities as explained in [[this > section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]|#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage]] > of KIP-584. The implementation needs to be done in > [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] > class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10621) Implement advanced CLI tool for feature versioning system
Kowshik Prakasam created KAFKA-10621: Summary: Implement advanced CLI tool for feature versioning system Key: KAFKA-10621 URL: https://issues.apache.org/jira/browse/KAFKA-10621 Project: Kafka Issue Type: Sub-task Reporter: Kowshik Prakasam Implement advanced CLI tool capabilities for the feature versioning system providing the facilities as explained in this section of KIP-584: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdvancedCLItoolusage] . The implementation needs to be done in [FeatureCommand|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/FeatureCommand.scala] class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #9468: KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered
mimaison commented on pull request #9468: URL: https://github.com/apache/kafka/pull/9468#issuecomment-713772503 @bbejeck Yes we can include this into 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
vvcephei commented on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-713766035 By the way, it was true that trunk was broken. Fixed by: https://github.com/apache/kafka/commit/2db67db8e1329cb2e047322cff81d97ff98b4328 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9475: MINOR: Add Jenkinsfile to 2.1
vvcephei commented on pull request #9475: URL: https://github.com/apache/kafka/pull/9475#issuecomment-713759548 See also #9471 and #9472 and #9474 @ijuma @jolshan @mumrah , are any of you able to review this as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei edited a comment on pull request #9475: MINOR: Add Jenkinsfile to 2.1
vvcephei edited a comment on pull request #9475: URL: https://github.com/apache/kafka/pull/9475#issuecomment-713759548 This is the last one. See also #9471 and #9472 and #9474 @ijuma @jolshan @mumrah , are any of you able to review this as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9475: MINOR: Add Jenkinsfile to 2.1
vvcephei opened a new pull request #9475: URL: https://github.com/apache/kafka/pull/9475 Add a Jenkinsfile for the 2.1 branch so PRs can be built ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9474: MINOR: Add Jenkinsfile to 2.2
vvcephei commented on pull request #9474: URL: https://github.com/apache/kafka/pull/9474#issuecomment-713758455 See also #9471 and #9472 @ijuma @jolshan @mumrah , are any of you able to review this as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9381) Javadocs + Scaladocs not published on maven central
[ https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218458#comment-17218458 ] Bill Bejeck edited comment on KAFKA-9381 at 10/21/20, 5:59 PM: --- Since this is a long-standing issue, I'm going to remove the blocker tag. I'm taking a look at getting this fixed in this release, so I have picked up the ticket. was (Author: bbejeck): Since this is a long-standing issue, I'm going to remove the blocker tag. > Javadocs + Scaladocs not published on maven central > --- > > Key: KAFKA-9381 > URL: https://issues.apache.org/jira/browse/KAFKA-9381 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1 >Reporter: Julien Jean Paul Sirocchi >Assignee: Bill Bejeck >Priority: Critical > Fix For: 2.8.0 > > > As per title, empty (aside for MANIFEST, LICENCE and NOTICE) > javadocs/scaladocs jars on central for any version (kafka nor scala), e.g. > [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on pull request #9472: MINOR: Add Jenkinsfile to 2.3
jolshan commented on pull request #9472: URL: https://github.com/apache/kafka/pull/9472#issuecomment-713754418 The versions look correct to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9474: MINOR: Add Jenkinsfile to 2.2
vvcephei opened a new pull request #9474: URL: https://github.com/apache/kafka/pull/9474 Add a Jenkinsfile for the 2.2 branch so PRs can be built ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9472: MINOR: Add Jenkinsfile to 2.3
vvcephei commented on pull request #9472: URL: https://github.com/apache/kafka/pull/9472#issuecomment-713749192 See also #9471 @ijuma @jolshan @mumrah , are any of you able to review this as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r509482332 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String, val maybeTruncationComplete = fetchOffsets.get(topicPartition) match { case Some(offsetTruncationState) => val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating +// Resetting `lastFetchedEpoch` since we are truncating and don't expect diverging epoch in the next fetch Review comment: This is a little unclear to me. I guess it is safe to reset `lastFetchedEpoch` as long as we reinitialize it after the next leader change. On the other hand, it seems safer to always retain the value. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -426,21 +454,42 @@ abstract class AbstractFetcherThread(name: String, warn(s"Partition $topicPartition marked as failed") } - def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = { + /** + * Returns initial partition fetch state based on current state and the provided `initialFetchState`. + * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch. + */ + private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { +if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && initialFetchState.lastFetchedEpoch.nonEmpty) { + if (currentState == null) { +return PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch, + state = Fetching, initialFetchState.lastFetchedEpoch) + } + // If we are in `Fetching` state can continue to fetch regardless of current leader epoch and truncate + // if necessary based on diverging epochs returned by the leader. If we are currently in Truncating state, + // fall through and handle based on current epoch. + if (currentState.state == Fetching) { +return currentState Review comment: Is it not possible that the `InitialFetchState` has a bump to the current leader epoch? We will still need the latest epoch in order to continue fetching. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -341,11 +352,18 @@ abstract class AbstractFetcherThread(name: String, // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if (validBytes > 0 && partitionStates.contains(topicPartition)) { // Update partitionStates only if there is no exception during processPartitionData -val newFetchState = PartitionFetchState(nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching) +val newFetchState = PartitionFetchState(nextOffset, Some(lag), + currentFetchState.currentLeaderEpoch, state = Fetching, + Some(currentFetchState.currentLeaderEpoch)) Review comment: This doesn't seem right. The last fetched epoch is supposed to represent the epoch of the last fetched batch. The fetcher could be fetching the data from an older epoch here. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String, val maybeTruncationComplete = fetchOffsets.get(topicPartition) match { case Some(offsetTruncationState) => val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating Review comment: Do we need to adjust this? I think we want to remain in the `Fetching` state if truncation detection is through `Fetch`. ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -629,7 +680,9 @@ abstract class AbstractFetcherThread(name: String, val initialLag = leaderEndOffset - offsetToFetch fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag - PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, state = Fetching) + // We don't expect diverging epochs from the next fetch request, so resetting `lastFetchedEpoch` Review comment: Again it seems safe to keep `lastFetchedEpoch` in sync with the local log. If we have done a full truncation above, then `lastFetchedEpoch` will be `None`, but otherwise it seems like we should set it. ## File path: core/src/main/scala/kafka/server/R
[jira] [Commented] (KAFKA-9381) Javadocs + Scaladocs not published on maven central
[ https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218458#comment-17218458 ] Bill Bejeck commented on KAFKA-9381: Since this is a long-standing issue, I'm going to remove the blocker tag. > Javadocs + Scaladocs not published on maven central > --- > > Key: KAFKA-9381 > URL: https://issues.apache.org/jira/browse/KAFKA-9381 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1 >Reporter: Julien Jean Paul Sirocchi >Assignee: Bill Bejeck >Priority: Blocker > Fix For: 2.7.0 > > > As per title, empty (aside for MANIFEST, LICENCE and NOTICE) > javadocs/scaladocs jars on central for any version (kafka nor scala), e.g. > [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9381) Javadocs + Scaladocs not published on maven central
[ https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-9381: --- Priority: Critical (was: Blocker) > Javadocs + Scaladocs not published on maven central > --- > > Key: KAFKA-9381 > URL: https://issues.apache.org/jira/browse/KAFKA-9381 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1 >Reporter: Julien Jean Paul Sirocchi >Assignee: Bill Bejeck >Priority: Critical > Fix For: 2.7.0 > > > As per title, empty (aside for MANIFEST, LICENCE and NOTICE) > javadocs/scaladocs jars on central for any version (kafka nor scala), e.g. > [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9381) Javadocs + Scaladocs not published on maven central
[ https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-9381: --- Fix Version/s: (was: 2.7.0) 2.8.0 > Javadocs + Scaladocs not published on maven central > --- > > Key: KAFKA-9381 > URL: https://issues.apache.org/jira/browse/KAFKA-9381 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1 >Reporter: Julien Jean Paul Sirocchi >Assignee: Bill Bejeck >Priority: Critical > Fix For: 2.8.0 > > > As per title, empty (aside for MANIFEST, LICENCE and NOTICE) > javadocs/scaladocs jars on central for any version (kafka nor scala), e.g. > [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9381) Javadocs + Scaladocs not published on maven central
[ https://issues.apache.org/jira/browse/KAFKA-9381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-9381: -- Assignee: Bill Bejeck (was: Randall Hauch) > Javadocs + Scaladocs not published on maven central > --- > > Key: KAFKA-9381 > URL: https://issues.apache.org/jira/browse/KAFKA-9381 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1 >Reporter: Julien Jean Paul Sirocchi >Assignee: Bill Bejeck >Priority: Blocker > Fix For: 2.7.0 > > > As per title, empty (aside for MANIFEST, LICENCE and NOTICE) > javadocs/scaladocs jars on central for any version (kafka nor scala), e.g. > [http://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-scala_2.12/2.3.1/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #9406: KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1
dajac commented on a change in pull request #9406: URL: https://github.com/apache/kafka/pull/9406#discussion_r509492493 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -444,10 +444,25 @@ private boolean maybeSendAndPollTransactionalRequest() { AbstractRequest.Builder requestBuilder = nextRequestHandler.requestBuilder(); Node targetNode = null; try { -targetNode = awaitNodeReady(nextRequestHandler.coordinatorType()); -if (targetNode == null) { +FindCoordinatorRequest.CoordinatorType coordinatorType = nextRequestHandler.coordinatorType(); +targetNode = coordinatorType != null ? +transactionManager.coordinator(coordinatorType) : +client.leastLoadedNode(time.milliseconds()); +if (targetNode != null) { +if (!awaitNodeReady(targetNode, coordinatorType)) { +log.trace("Target node {} not ready within request timeout, will retry when node is ready.", targetNode); +maybeFindCoordinatorAndRetry(nextRequestHandler); +return true; +} +} else if (coordinatorType != null) { +log.trace("Coordinator not known for {}, will retry {} after finding coordinator.", coordinatorType, requestBuilder.apiKey()); maybeFindCoordinatorAndRetry(nextRequestHandler); return true; +} else { +log.trace("No nodes available to send requests, will poll and retry when until a node is ready."); +transactionManager.retry(nextRequestHandler); +client.poll(retryBackoffMs, time.milliseconds()); +return true; Review comment: @rajinisivaram Yeah, I do agree. Polling seems sufficient in this case. Thanks for the clarification. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9910) Implement new transaction timed out error
[ https://issues.apache.org/jira/browse/KAFKA-9910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218450#comment-17218450 ] Bill Bejeck commented on KAFKA-9910: This is a sub-task for KIP-588, which is not going in 2.7, so I'm going to move the fix version to 2.8.0 as part of the 2.7.0 release process. > Implement new transaction timed out error > - > > Key: KAFKA-9910 > URL: https://issues.apache.org/jira/browse/KAFKA-9910 > Project: Kafka > Issue Type: Sub-task > Components: clients, core >Reporter: Boyang Chen >Assignee: HaiyuanZhao >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9910) Implement new transaction timed out error
[ https://issues.apache.org/jira/browse/KAFKA-9910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-9910: --- Fix Version/s: (was: 2.7.0) 2.8.0 > Implement new transaction timed out error > - > > Key: KAFKA-9910 > URL: https://issues.apache.org/jira/browse/KAFKA-9910 > Project: Kafka > Issue Type: Sub-task > Components: clients, core >Reporter: Boyang Chen >Assignee: HaiyuanZhao >Priority: Major > Fix For: 2.8.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9803) Allow producers to recover gracefully from transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-9803: --- Fix Version/s: (was: 2.7.0) 2.8.0 > Allow producers to recover gracefully from transaction timeouts > --- > > Key: KAFKA-9803 > URL: https://issues.apache.org/jira/browse/KAFKA-9803 > Project: Kafka > Issue Type: Improvement > Components: producer , streams >Reporter: Jason Gustafson >Assignee: Boyang Chen >Priority: Major > Labels: needs-kip > Fix For: 2.8.0 > > > Transaction timeouts are detected by the transaction coordinator. When the > coordinator detects a timeout, it bumps the producer epoch and aborts the > transaction. The epoch bump is necessary in order to prevent the current > producer from being able to begin writing to a new transaction which was not > started through the coordinator. > Transactions may also be aborted if a new producer with the same > `transactional.id` starts up. Similarly this results in an epoch bump. > Currently the coordinator does not distinguish these two cases. Both will end > up as a `ProducerFencedException`, which means the producer needs to shut > itself down. > We can improve this with the new APIs from KIP-360. When the coordinator > times out a transaction, it can remember that fact and allow the existing > producer to claim the bumped epoch and continue. Roughly the logic would work > like this: > 1. When a transaction times out, set lastProducerEpoch to the current epoch > and do the normal bump. > 2. Any transactional requests from the old epoch result in a new > TRANSACTION_TIMED_OUT error code, which is propagated to the application. > 3. The producer recovers by sending InitProducerId with the current epoch. > The coordinator returns the bumped epoch. > One issue that needs to be addressed is how to handle INVALID_PRODUCER_EPOCH > from Produce requests. Partition leaders will not generally know if a bumped > epoch was the result of a timed out transaction or a fenced producer. > Possibly the producer can treat these errors as abortable when they come from > Produce responses. In that case, the user would try to abort the transaction > and then we can see if it was due to a timeout or otherwise. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9803) Allow producers to recover gracefully from transaction timeouts
[ https://issues.apache.org/jira/browse/KAFKA-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218446#comment-17218446 ] Bill Bejeck commented on KAFKA-9803: As discussed with [~bchen225242] offline, looks like KIP-588 won't make 2.7. As part of the 2.7.0 release process, I'm going to move the fix version to 2.8.0 > Allow producers to recover gracefully from transaction timeouts > --- > > Key: KAFKA-9803 > URL: https://issues.apache.org/jira/browse/KAFKA-9803 > Project: Kafka > Issue Type: Improvement > Components: producer , streams >Reporter: Jason Gustafson >Assignee: Boyang Chen >Priority: Major > Labels: needs-kip > Fix For: 2.7.0 > > > Transaction timeouts are detected by the transaction coordinator. When the > coordinator detects a timeout, it bumps the producer epoch and aborts the > transaction. The epoch bump is necessary in order to prevent the current > producer from being able to begin writing to a new transaction which was not > started through the coordinator. > Transactions may also be aborted if a new producer with the same > `transactional.id` starts up. Similarly this results in an epoch bump. > Currently the coordinator does not distinguish these two cases. Both will end > up as a `ProducerFencedException`, which means the producer needs to shut > itself down. > We can improve this with the new APIs from KIP-360. When the coordinator > times out a transaction, it can remember that fact and allow the existing > producer to claim the bumped epoch and continue. Roughly the logic would work > like this: > 1. When a transaction times out, set lastProducerEpoch to the current epoch > and do the normal bump. > 2. Any transactional requests from the old epoch result in a new > TRANSACTION_TIMED_OUT error code, which is propagated to the application. > 3. The producer recovers by sending InitProducerId with the current epoch. > The coordinator returns the bumped epoch. > One issue that needs to be addressed is how to handle INVALID_PRODUCER_EPOCH > from Produce requests. Partition leaders will not generally know if a bumped > epoch was the result of a timed out transaction or a fenced producer. > Possibly the producer can treat these errors as abortable when they come from > Produce responses. In that case, the user would try to abort the transaction > and then we can see if it was due to a timeout or otherwise. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan opened a new pull request #9473: KAFKA-10545: Create topic IDs in ZooKeeper and Controller
jolshan opened a new pull request #9473: URL: https://github.com/apache/kafka/pull/9473 Topic IDs must be created for all new topics and all existing topics that do not yet have a topic ID. In ZooKeeper, the ID is written to the TopicZNode, and in the controller, it is stored in a map. This is a preliminary change before the second part of KAFKA-10545, which will propagate these IDs to brokers. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9468: KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered
bbejeck commented on pull request #9468: URL: https://github.com/apache/kafka/pull/9468#issuecomment-713730656 Hi @mimaison , do you think we can get this into 2.6.1? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers
mjsax commented on pull request #9000: URL: https://github.com/apache/kafka/pull/9000#issuecomment-713726509 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10579) Flaky test connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy
[ https://issues.apache.org/jira/browse/KAFKA-10579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17218420#comment-17218420 ] Tom Bentley commented on KAFKA-10579: - Looking at the code it seems to me this might be a lack of thread safety in Reflections, and so might be addressed by https://github.com/ronmamo/reflections/issues/281 once a version with that fix in it is released. > Flaky test > connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy > > > Key: KAFKA-10579 > URL: https://issues.apache.org/jira/browse/KAFKA-10579 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > > > {{java.lang.NullPointerException > at > java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) > at org.reflections.Store.getAllIncluding(Store.java:82) > at org.reflections.Store.getAll(Store.java:93) > at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209) > at > org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61) > at > org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91) > at > org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50) > at > org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167) > at > org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testStartWhenInternalTopicsCreatedManuallyWithCompactForBrokersDefaultCleanupPolicy(InternalTopicsIntegrationTest.java:260)}} > {{}} > https://github.com/apache/kafka/pull/9280/checks?check_run_id=1214776222{{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9406: KAFKA-10520; Ensure transactional producers poll if leastLoadedNode not available with max.in.flight=1
hachikuji commented on a change in pull request #9406: URL: https://github.com/apache/kafka/pull/9406#discussion_r509456514 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ## @@ -2667,4 +2760,43 @@ private void assertFutureFailure(Future future, Class ex } } +private void createMockClientWithMaxFlightOneMetadataPending() { +client = new MockClient(time, metadata) { Review comment: Wonder if we should consider adding max inflight behavior directly to `MockClient`. Seems like a notable difference from `NetworkClient`. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -444,10 +444,25 @@ private boolean maybeSendAndPollTransactionalRequest() { AbstractRequest.Builder requestBuilder = nextRequestHandler.requestBuilder(); Node targetNode = null; try { -targetNode = awaitNodeReady(nextRequestHandler.coordinatorType()); -if (targetNode == null) { +FindCoordinatorRequest.CoordinatorType coordinatorType = nextRequestHandler.coordinatorType(); +targetNode = coordinatorType != null ? +transactionManager.coordinator(coordinatorType) : +client.leastLoadedNode(time.milliseconds()); +if (targetNode != null) { +if (!awaitNodeReady(targetNode, coordinatorType)) { +log.trace("Target node {} not ready within request timeout, will retry when node is ready.", targetNode); +maybeFindCoordinatorAndRetry(nextRequestHandler); +return true; +} +} else if (coordinatorType != null) { +log.trace("Coordinator not known for {}, will retry {} after finding coordinator.", coordinatorType, requestBuilder.apiKey()); maybeFindCoordinatorAndRetry(nextRequestHandler); return true; +} else { +log.trace("No nodes available to send requests, will poll and retry when until a node is ready."); +transactionManager.retry(nextRequestHandler); +client.poll(retryBackoffMs, time.milliseconds()); +return true; Review comment: I agree polling seems sufficient. We will still have an opportunity to refresh metadata if the current connection fails for some reason. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6
jolshan commented on pull request #9471: URL: https://github.com/apache/kafka/pull/9471#issuecomment-713724055 Looks like the right configurations are added to me. Might want to get one more pair of eyes and make sure the tests run. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9472: MINOR: Add Jenkinsfile to 2.3
vvcephei opened a new pull request #9472: URL: https://github.com/apache/kafka/pull/9472 Add a Jenkinsfile for the 2.3 branch so PRs can be built ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6
vvcephei commented on pull request #9471: URL: https://github.com/apache/kafka/pull/9471#issuecomment-713719243 @ijuma @jolshan @mumrah , are any of you able to review this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9471: MINOR: Add Jenkinsfile to 2.6
vvcephei opened a new pull request #9471: URL: https://github.com/apache/kafka/pull/9471 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors
ableegoldman commented on pull request #9446: URL: https://github.com/apache/kafka/pull/9446#issuecomment-713709721 Cherry-picked to 2.7 (had to fix up a test in StreamsPartitionAssignorTest that was relying on the new `ReferenceContainer` which is only in trunk) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10618) Add UUID class, use in protocols
[ https://issues.apache.org/jira/browse/KAFKA-10618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-10618. Resolution: Fixed [https://github.com/apache/kafka/pull/9454/files] was merged. > Add UUID class, use in protocols > > > Key: KAFKA-10618 > URL: https://issues.apache.org/jira/browse/KAFKA-10618 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > Before implementing topic IDs, a public UUID class must be created and used > in protocols -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors
ableegoldman commented on pull request #9446: URL: https://github.com/apache/kafka/pull/9446#issuecomment-713685350 Merged to trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors
ableegoldman merged pull request #9446: URL: https://github.com/apache/kafka/pull/9446 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values
mimaison commented on pull request #9467: URL: https://github.com/apache/kafka/pull/9467#issuecomment-713678526 @vvcephei Yes, feel free to merge in 2.6 once it's ready This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup
vvcephei commented on pull request #9414: URL: https://github.com/apache/kafka/pull/9414#issuecomment-713664978 Just for reference, I don't see those failing tests on trunk (67bc4f08feb50ac135a4d8e1d469747102aad3a6). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
vvcephei commented on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-713663316 FYI, I just got a successful build of this locally with master merged in. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order
tombentley commented on pull request #9441: URL: https://github.com/apache/kafka/pull/9441#issuecomment-713663078 @guozhangwang, @hachikuji I was thinking... solving it as I have now (keeping track of the epoch) doesn't address another potential problem case where the tenure as leader is very short and the background task to load the state runs after the background task to unload the state in the same epoch. Obviously in this case the load should not be done (I guess it could result in a broker not returning NOT_COORDINATOR when it should, based on incorrect state of `ownedPartitions`). We could track a high watermark coordinator epoch and guard the load with a check (as well as the unload). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values
vvcephei commented on pull request #9467: URL: https://github.com/apache/kafka/pull/9467#issuecomment-713659242 ... I'm not sure how to get Jenkins to test this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org