[GitHub] [kafka] kkonstantine commented on a change in pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed
kkonstantine commented on a change in pull request #8204: URL: https://github.com/apache/kafka/pull/8204#discussion_r510631763 ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -483,6 +483,7 @@ public void logUnused() { resolvedOriginals.putAll(result.data()); } } +providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); Review comment: Good observations @jherico . I believe the latter approach, of closing all the instantiated providers if an exception occurs in both cases, would be the most straightforward fix. If you'd be interested in submitting a fix, that would be very welcome! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc opened a new pull request #9485: [WIP] KAKFA-10619: Producer will enable EOS by default
d8tltanc opened a new pull request #9485: URL: https://github.com/apache/kafka/pull/9485 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kotharironak edited a comment on pull request #8043: KAFKA-6793: Unnecessary warning log message
kotharironak edited a comment on pull request #8043: URL: https://github.com/apache/kafka/pull/8043#issuecomment-714917967 we are also observing quite a few warnings - https://github.com/hypertrace/pinot/issues/26 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kotharironak commented on pull request #8043: KAFKA-6793: Unnecessary warning log message
kotharironak commented on pull request #8043: URL: https://github.com/apache/kafka/pull/8043#issuecomment-714917967 we are also observing quite a few warnings - https://github.com/hypertrace/pinot/issues/26 Could you pl. point to KIP link for this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r510568706 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def checkForwarding(request: RequestChannel.Request): Unit = { +if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) { + throw new IllegalStateException("Given RPC " + request.header.apiKey + " does not support forwarding.") +} + } + + private def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit): Unit = { +if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) { + sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception()) +} else if (request.envelopeContext.isDefined && + (!request.context.fromPrivilegedListener || + !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) +) { + // If the designated forwarding request is not coming from a privileged listener, or + // it fails CLUSTER_ACTION permission, we would fail the authorization. + sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception()) Review comment: The question would be how the forwarding broker should do the error handling for auth & principal serde exceptions. To me we should get a vanilla error response with `UNKNOWN_SERVER_ERROR` and get back to the original client? Besides that, I think we could add a differentiation here to avoid passing the serde-type errors to the client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 edited a comment on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
feyman2016 edited a comment on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-714871862 Thanks a lot for the review and merge @abbccdda @vvcephei! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
feyman2016 commented on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-714871862 Thanks a lot for the help @abbccdda @vvcephei! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -94,19 +104,63 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size -//most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. -//some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference -//to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. +// most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. +// some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference +// to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. if (!header.apiKey.requiresDelayedAllocation) { releaseBuffer() } -def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" +def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = { + envelopeContext match { +case Some(envelopeContext) => + val envelopeResponse = new EnvelopeResponse( +abstractResponse.throttleTimeMs(), Review comment: Quotas are one aspect of this work that need more consideration. What we don't want is for the inter-broker channel to get affected by the individual client throttle, which is what will happen with the current patch. What I'd suggest for now is that we allow the broker to track client quotas and pass back the throttle value in the underlying response, but we set the envelope throttle time to 0 and ensure that the inter-broker channel does not get throttled. For this, I think we we will need to change the logic in `KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to `ClientQuotaManager.throttle`. When the response is received on the forwarding broker, we will need to apply the throttle, which I think the patch already handles. One challenging aspect is how this will affect quota metrics. Currently quota/throttling metrics are relatively simple because they are recorded separately by each broker. However, here the controller is the one that is tracking the throttling for the client across multiple inbound connections from multiple brokers. This means that the broker that is applying a throttle for a forwarded request may not have actually observed a quota violation. Other than causing some reporting confusion, I am not sure whether there are any other consequences to this. cc @apovzner @rajinisivaram This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -94,19 +104,63 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size -//most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. -//some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference -//to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. +// most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. +// some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference +// to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. if (!header.apiKey.requiresDelayedAllocation) { releaseBuffer() } -def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" +def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = { + envelopeContext match { +case Some(envelopeContext) => + val envelopeResponse = new EnvelopeResponse( +abstractResponse.throttleTimeMs(), Review comment: Quotas are one aspect of this work that need more consideration. What we don't want is for the inter-broker channel to get affected by the individual client throttle, which is what will happen with the current patch. What I'd suggest for now is that we allow the broker to track client quotas and pass back the throttle value in the underlying response, but we set the envelope throttle time to 0 and ensure that the channel does not get throttled. For this, I think we we will need to change the logic in `KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to `ClientQuotaManager.throttle`. When the response is received on the forwarding broker, we will need to apply the throttle, which I think the patch already handles. One challenging aspect is how this will affect quota metrics. Currently quota/throttling metrics are relatively simple because they are recorded separately by each broker. However, here the controller is the one that is tracking the throttling for the client across multiple inbound connections from multiple brokers. This means that the broker that is applying a throttle for a forwarded request may not have actually observed a quota violation. Other than causing some reporting confusion, I am not sure whether there are any other consequences to this. cc @apovzner @rajinisivaram ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def checkForwarding(request: RequestChannel.Request): Unit = { +if (!request.header.apiKey.forwardable && request.envelopeContext.isDefined) { + throw new IllegalStateException("Given RPC " + request.header.apiKey + " does not support forwarding.") +} + } + + private def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit): Unit = { +if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) { + sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception()) +} else if (request.envelopeContext.isDefined && + (!request.context.fromPrivilegedListener || + !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) +) { + // If the designated forwarding request is not coming from a privileged listener, or + // it fails CLUSTER_ACTION permission, we would fail the authorization. + sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception()) Review comment: One challenge we have here is that there are two levels of errors. The current patch seems to conflate the two, which makes it confusing. I think we need a structure which allows us to separate the errors possible at the envelope level and those possible at the request level. What I'm thinking is this: 1. For cluster auth and principal serde errors, we should return the envelope error and null
[GitHub] [kafka] dengziming commented on pull request #7862: KAFKA-9246:Update Heartbeat timeout when ConsumerCoordinator commit offset
dengziming commented on pull request #7862: URL: https://github.com/apache/kafka/pull/7862#issuecomment-714856288 @abbccdda @hachikuji , Hi, PTAL, thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
hachikuji merged pull request #9476: URL: https://github.com/apache/kafka/pull/9476 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
hachikuji commented on pull request #9476: URL: https://github.com/apache/kafka/pull/9476#issuecomment-714846028 The build failure appears to be unrelated. ``` [2020-10-23T01:08:22.296Z] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on project standalone-pom: A Maven project already exists in the directory /home/jenkins/workspace/Kafka_kafka-pr_PR-9476/streams/quickstart/test-streams-archetype/streams.examples -> [Help 1] [2020-10-23T01:08:22.296Z] [ERROR] [2020-10-23T01:08:22.296Z] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [2020-10-23T01:08:22.297Z] [ERROR] Re-run Maven using the -X switch to enable full debug logging. [2020-10-23T01:08:22.297Z] [ERROR] [2020-10-23T01:08:22.297Z] [ERROR] For more information about the errors and possible solutions, please read the following articles: [2020-10-23T01:08:22.298Z] [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException ``` I will merge to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9484: MINOR: Update raft/README.md and minor RaftConfig tweaks
hachikuji merged pull request #9484: URL: https://github.com/apache/kafka/pull/9484 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r510536894 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1005,6 +1013,36 @@ private[kafka] class Processor(val id: Int, selector.clearCompletedReceives() } + private def parseEnvelopeRequest(receive: NetworkReceive, + nowNanos: Long, + connectionId: String, + context: RequestContext, + principalSerde: Option[KafkaPrincipalSerde]) = { +val envelopeRequest = context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest] + +val originalHeader = RequestHeader.parse(envelopeRequest.requestData) +// Leave the principal null here is ok since we will fail the request during Kafka API handling. +val originalPrincipal = if (principalSerde.isDefined) + principalSerde.get.deserialize(envelopeRequest.principalData) +else + null + +// The forwarding broker and the active controller should have the same DNS resolution, and we need +// to have the original client address for authentication purpose. +val originalClientAddress = InetAddress.getByName(envelopeRequest.clientHostName) Review comment: I was thinking a little bit about this and trying to decide if the envelope request should have a more literal representation of the client ip address. The way it is working right now, it looks like the following: 1) Use `Socket.getInetAddress` to populate `RequestContext.clientAddress`. 2) Use `InetAddress.getHostName` to populate the `clientHostName` field in the envelope request. This will do a reverse dns lookup based on the IP address from 1). 3) Now we send `clientHostName` over the wire. It gets unpacked here by doing a dns lookup to get to the `InetAddress` object. So it seems we should be skipping the dns translation and just using the IP address from 1). The `InetAddress` class gives us `getAddress` and `getHostAddress`. The first provides the raw byte representation of the ip address, while the latter provides a textual representation. I am thinking we should use `getAddress` and let this field be represented as bytes. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r510533897 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int, val context = new RequestContext(header, connectionId, channel.socketAddress, channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation, isPrivilegedListener) -val req = new RequestChannel.Request(processor = id, context = context, - startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) + +val principalSerde = Option(channel.principalSerde.orElse(null)) Review comment: You probably need the following: ```scala import scala.compat.java8.OptionConverters._ ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510513954 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { -updateHighWatermark(state, currentTimeMs); +onUpdateLeaderHighWatermark(state, currentTimeMs); } -LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); -fetchPurgatory.maybeComplete(endOffset, currentTimeMs); +fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } -private void updateHighWatermark( -EpochState state, +private void onUpdateLeaderHighWatermark( +LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { -logger.debug("High watermark updated to {}", highWatermark); +logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - -LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); -appendPurgatory.maybeComplete(offset, currentTimeMs); -fetchPurgatory.maybeComplete(offset, currentTimeMs); +appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); +maybeFireHandleCommit(highWatermark.offset); }); } -@Override -public LeaderAndEpoch currentLeaderAndEpoch() { -return quorum.leaderAndEpoch(); +private void maybeFireHandleCommit(long highWatermark) { +maybeFireHandleCommit(listenerContexts, highWatermark); +} + +private void maybeFireHandleCommit(List listenerContexts, long highWatermark) { +// TODO: When there are multiple listeners, we can cache reads to save some work +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +return; +} + +long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); +if (nextExpectedOffset < highWatermark) { +LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED); +listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records); +} +} +} + +private void maybeFireHandleCommit(long baseOffset, int epoch, List records) { +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +return; +} + +long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); +if (nextExpectedOffset == baseOffset) { +listenerContext.fireHandleCommit(baseOffset, epoch, records); +} +} +} + +private void maybeFireHandleClaim(LeaderState state) { +for (ListenerContext listenerContext : listenerContexts) { +int leaderEpoch = state.epoch(); + +// We can fire `handleClaim` as soon as the listener has caught +// up to the start of the leader epoch. This guarantees that the +// state machine has seen the full committed state before it becomes +// leader and begins writing to the log. Review comment: I guess let's keep this option in our back pocket for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10636) Bypass log validation for writes to raft log
Jason Gustafson created KAFKA-10636: --- Summary: Bypass log validation for writes to raft log Key: KAFKA-10636 URL: https://issues.apache.org/jira/browse/KAFKA-10636 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson The raft leader is responsible for creating the records written to the log (including assigning offsets and the epoch), so we can consider bypassing the validation done in `LogValidator`. This lets us skip potentially expensive decompression and the unnecessary recomputation of the CRC. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
Peeraya Maetasatidsuk created KAFKA-10635: - Summary: Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers Key: KAFKA-10635 URL: https://issues.apache.org/jira/browse/KAFKA-10635 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.5.1 Reporter: Peeraya Maetasatidsuk We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a rolling restart of the brokers after installing the new version. After the restarts we notice one of our streams app (client version 2.4.1) fails with OutOfOrderSequenceException: {code:java} ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected error. Record: a_record, destination topic: topic-name-Aggregation-repartition org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. ERROR [2020-10-13 22:52:21,413] [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [1_39] Abort sending since an error caught with a previous record (timestamp 1602654659000) to topic topic-name-Aggregation-repartition due to org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number.at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.base/java.lang.Thread.run(Thread.java:834)Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. {code} We see a corresponding error on the broker side: {code:java} [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error processing append operation on partition topic-name-Aggregation-repartition-52 (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producerId 2819098 at offset 1156041 in partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), -1 (current end sequence number) {code} We are able to reproduce this many times and it happens regardless of whether the broker shutdown (at restart) is clean or unclean. However, when we rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling restarts, we don't see this error on the streams application at all. This is blocking us from upgrading our broker version. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r510500535 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java ## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorClient; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.utils.ThreadedConsumer; +import org.apache.kafka.connect.mirror.utils.ThreadedProducer; +import org.apache.kafka.test.IntegrationTest; +import org.junit.Before; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + +/** + * Tests MM2 replication and failover/failback logic. + * + * MM2 is configured with active/active replication between two Kafka clusters. Tests validate that + * records sent to either cluster arrive at the other cluster. Then, a consumer group is migrated from + * one cluster to the other and back. Tests validate that consumer offsets are translated and replicated + * between clusters during this failover and failback. + */ +@Category(IntegrationTest.class) +public class MirrorConnectorsIntegrationTest extends MirrorConnectorsIntegrationBaseTest { + +private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationTest.class); + +private static final List CONNECTOR_LIST = +Arrays.asList(MirrorSourceConnector.class, MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class); + +@Before +public void setup() throws InterruptedException { Review comment: in `MirrorConnectorsIntegrationSSLTest`, we put ssl config into `backupBrokerProps`, so the setup() of both `MirrorConnectorsIntegrationTest` and `MirrorConnectorsIntegrationSSLTest` will be slightly different This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r510499941 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestSslUtils; +import org.apache.kafka.test.TestUtils; +import kafka.server.KafkaConfig$; + +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +/** + * Tests MM2 replication with SSL enabled at backup kafka cluster + */ +@Category(IntegrationTest.class) +public class MirrorConnectorsIntegrationSSLTest extends MirrorConnectorsIntegrationBaseTest { + +private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationSSLTest.class); + +private static final List CONNECTOR_LIST = +Arrays.asList(MirrorSourceConnector.class, MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class); + +@Before +public void setup() throws InterruptedException { +try { +Map sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); +backupBrokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0"); + backupBrokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL"); +backupBrokerProps.putAll(sslConfig); +} catch (final Exception e) { +throw new RuntimeException(e); +} +startClusters(); +} + +@After +public void close() { Review comment: if move to base class, will close() still be called at demolition stage of test? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r510498433 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -278,13 +265,13 @@ public void testEndQuorumStartsNewElectionAfterBackoffIfReceivedFromVotedCandida .withVotedCandidate(epoch, otherNodeId) Review comment: Makes sense. I implemented this suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r510494608 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ## @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.experimental.categories.Category; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.mirror.TestUtils.expectedRecords; +import static org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import kafka.server.KafkaConfig$; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +//import org.junit.After; + +/** + * Common Test functions for MM2 integration tests + */ +@Category(IntegrationTest.class) +public class MirrorConnectorsIntegrationBaseTest { +private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class); + +protected static final int NUM_RECORDS_PER_PARTITION = 10; +public static final int NUM_PARTITIONS = 10; +protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; +protected static final int RECORD_TRANSFER_DURATION_MS = 30_000; +protected static final int CHECKPOINT_DURATION_MS = 20_000; +protected static final int RECORD_CONSUME_DURATION_MS = 20_000; +protected static final int OFFSET_SYNC_DURATION_MS = 30_000; +protected static final int NUM_WORKERS = 3; + + +protected Map mm2Props; +protected MirrorMakerConfig mm2Config; +protected EmbeddedConnectCluster primary; +protected EmbeddedConnectCluster backup; + +private final AtomicBoolean exited = new AtomicBoolean(false); +private Properties primaryBrokerProps = new Properties(); +protected Properties backupBrokerProps = new Properties(); Review comment: because `backupBrokerProps` is referred in `MirrorConnectorsIntegrationSSLTest` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
hachikuji commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r510488902 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -278,13 +265,13 @@ public void testEndQuorumStartsNewElectionAfterBackoffIfReceivedFromVotedCandida .withVotedCandidate(epoch, otherNodeId) Review comment: The pattern I had in mind was a little different. I was thinking something like this: ```java int localId = 0; int otherNodeId = 1; int epoch = 2; Set voters = Utils.mkSet(localId, otherNodeId); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) .withVotedCandidate(epoch, otherNodeId) .build() ``` Then we don't have the awkwardness of the partial reliance on the static `LOCAL_ID`. I like this better because the ids have to be explicitly declared in each test case, which makes it easier to follow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r510486514 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestSslUtils; +import org.apache.kafka.test.TestUtils; +import kafka.server.KafkaConfig$; + +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +/** + * Tests MM2 replication with SSL enabled at backup kafka cluster + */ +@Category(IntegrationTest.class) +public class MirrorConnectorsIntegrationSSLTest extends MirrorConnectorsIntegrationBaseTest { + +private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationSSLTest.class); + +private static final List CONNECTOR_LIST = +Arrays.asList(MirrorSourceConnector.class, MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class); + +@Before +public void setup() throws InterruptedException { +try { +Map sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); +backupBrokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0"); + backupBrokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL"); +backupBrokerProps.putAll(sslConfig); +} catch (final Exception e) { +throw new RuntimeException(e); Review comment: referred to the example at https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java#L58 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9483: MINOR: Update docs to point to next release add notable features for 2.7
bbejeck merged pull request #9483: URL: https://github.com/apache/kafka/pull/9483 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9483: MINOR: Update docs to point to next release add notable features for 2.7
bbejeck commented on pull request #9483: URL: https://github.com/apache/kafka/pull/9483#issuecomment-714784672 only html changes, so merging now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #9484: MINOR: Update raft/README.md and minor RaftConfig tweaks
ijuma opened a new pull request #9484: URL: https://github.com/apache/kafka/pull/9484 * Replace quorum.bootstrap.servers and quorum.bootstrap.voters with quorum.voters. * Remove seemingly unused `verbose` config. * Use constant to avoid unnecessary repeated concatenation. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6
vvcephei commented on pull request #9471: URL: https://github.com/apache/kafka/pull/9471#issuecomment-714776458 So far, I have not been able to reproduce it. I even re-ran the exact same command as Jenkins: ``` [john@arcturus kafka]$ javac -version javac 11.0.8 [john@arcturus kafka]$ ./gradlew -PscalaVersion=2.13 unitTest integrationTest --profile --no-daemon --continue -PtestLoggingEvents=started,passed,skipped,failed -PignoreFailures=true -PmaxParallelForks=2 -PmaxTestRetries=1 -PmaxTestRetryFailures=5 ... BUILD SUCCESSFUL in 1h 20m 15s 144 actionable tasks: 59 executed, 85 up-to-date See the profiling report at: file:///home/confluent/kafka/build/reports/profile/profile-2020-10-22-14-21-43.html A fine-grained performance profile is available: use the --scan option. ``` Not sure where to go from here... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck opened a new pull request #9483: MINOR: Update docs to point to next release add notable features for 2.7
bbejeck opened a new pull request #9483: URL: https://github.com/apache/kafka/pull/9483 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
jsancio commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510469742 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { -updateHighWatermark(state, currentTimeMs); +onUpdateLeaderHighWatermark(state, currentTimeMs); } -LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); -fetchPurgatory.maybeComplete(endOffset, currentTimeMs); +fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } -private void updateHighWatermark( -EpochState state, +private void onUpdateLeaderHighWatermark( +LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { -logger.debug("High watermark updated to {}", highWatermark); +logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - -LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); -appendPurgatory.maybeComplete(offset, currentTimeMs); -fetchPurgatory.maybeComplete(offset, currentTimeMs); +appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); +maybeFireHandleCommit(highWatermark.offset); }); } -@Override -public LeaderAndEpoch currentLeaderAndEpoch() { -return quorum.leaderAndEpoch(); +private void maybeFireHandleCommit(long highWatermark) { +maybeFireHandleCommit(listenerContexts, highWatermark); +} + +private void maybeFireHandleCommit(List listenerContexts, long highWatermark) { +// TODO: When there are multiple listeners, we can cache reads to save some work +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +return; +} + +long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); +if (nextExpectedOffset < highWatermark) { +LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED); +listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records); +} +} +} + +private void maybeFireHandleCommit(long baseOffset, int epoch, List records) { +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +return; +} + +long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); +if (nextExpectedOffset == baseOffset) { +listenerContext.fireHandleCommit(baseOffset, epoch, records); +} +} +} + +private void maybeFireHandleClaim(LeaderState state) { +for (ListenerContext listenerContext : listenerContexts) { +int leaderEpoch = state.epoch(); + +// We can fire `handleClaim` as soon as the listener has caught +// up to the start of the leader epoch. This guarantees that the +// state machine has seen the full committed state before it becomes +// leader and begins writing to the log. Review comment: Yeah. I was thinking of the same thing, "hold the requests in purgatory". But like you said, maybe this optimization is not worth the added complexity. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510455713 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { -updateHighWatermark(state, currentTimeMs); +onUpdateLeaderHighWatermark(state, currentTimeMs); } -LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); -fetchPurgatory.maybeComplete(endOffset, currentTimeMs); +fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } -private void updateHighWatermark( -EpochState state, +private void onUpdateLeaderHighWatermark( +LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { -logger.debug("High watermark updated to {}", highWatermark); +logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - -LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); -appendPurgatory.maybeComplete(offset, currentTimeMs); -fetchPurgatory.maybeComplete(offset, currentTimeMs); +appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); +maybeFireHandleCommit(highWatermark.offset); }); } -@Override -public LeaderAndEpoch currentLeaderAndEpoch() { -return quorum.leaderAndEpoch(); +private void maybeFireHandleCommit(long highWatermark) { +maybeFireHandleCommit(listenerContexts, highWatermark); +} + +private void maybeFireHandleCommit(List listenerContexts, long highWatermark) { +// TODO: When there are multiple listeners, we can cache reads to save some work +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +return; +} + +long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); +if (nextExpectedOffset < highWatermark) { +LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED); +listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records); +} +} +} + +private void maybeFireHandleCommit(long baseOffset, int epoch, List records) { +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +return; +} + +long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); +if (nextExpectedOffset == baseOffset) { +listenerContext.fireHandleCommit(baseOffset, epoch, records); +} +} +} + +private void maybeFireHandleClaim(LeaderState state) { +for (ListenerContext listenerContext : listenerContexts) { +int leaderEpoch = state.epoch(); + +// We can fire `handleClaim` as soon as the listener has caught +// up to the start of the leader epoch. This guarantees that the +// state machine has seen the full committed state before it becomes +// leader and begins writing to the log. Review comment: I thought a little about it. Right now the state machine has just two states: 1) i am not a leader, and 2) i am a leader and have caught up with all committed data from previous epochs. An alternative design is to fire `handleClaim` immediately and provide the starting offset of the leader epoch. Then the controller can wait until its state machine has caught up to that offset before starting to write data. In the end, I decided not to do it because it adds a third state and I did not expect the controller would be able to do anything useful in the additional state. The point about heartbeats is interesting, but even that seems tricky since the controller would not know if a broker had been fenced until it has caught up. I think the only thing the controller could do is hold the requests in purgatory, which might be better than letting them retry, but not sure it's worth it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r510449508 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.BeginQuorumEpochRequestData; +import org.apache.kafka.common.message.BeginQuorumEpochResponseData; +import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState; +import org.apache.kafka.common.message.DescribeQuorumResponseData; +import org.apache.kafka.common.message.EndQuorumEpochRequestData; +import org.apache.kafka.common.message.EndQuorumEpochResponseData; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.LeaderChangeMessage.Voter; +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.VoteRequestData; +import org.apache.kafka.common.message.VoteResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.ControlRecordUtils; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.BeginQuorumEpochResponse; +import org.apache.kafka.common.requests.DescribeQuorumResponse; +import org.apache.kafka.common.requests.VoteResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.mockito.Mockito; +import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +final class RaftClientTestContext { +private static final int FETCH_MAX_WAIT_MS = 0; + +static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0); +static final int LOCAL_ID = 0; Review comment: Okay. I made it possible to easily add support of this in the future without breaking the existing tests. We can make this changeable in the `Builder` as we need it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r510448694 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.BeginQuorumEpochRequestData; +import org.apache.kafka.common.message.BeginQuorumEpochResponseData; +import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState; +import org.apache.kafka.common.message.DescribeQuorumResponseData; +import org.apache.kafka.common.message.EndQuorumEpochRequestData; +import org.apache.kafka.common.message.EndQuorumEpochResponseData; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.LeaderChangeMessage.Voter; +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.VoteRequestData; +import org.apache.kafka.common.message.VoteResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.ControlRecordUtils; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.BeginQuorumEpochResponse; +import org.apache.kafka.common.requests.DescribeQuorumResponse; +import org.apache.kafka.common.requests.VoteResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.mockito.Mockito; +import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +final class RaftClientTestContext { +private static final int FETCH_MAX_WAIT_MS = 0; + +static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0); +static final int LOCAL_ID = 0; + +static final int ELECTION_BACKOFF_MAX_MS = 100; +static final int ELECTION_TIMEOUT_MS = 1; +// fetch timeout is usually larger than election timeout +static final int FETCH_TIMEOUT_MS = 5; +static final int REQUEST_TIMEOUT_MS = 5000; +static final int RETRY_BACKOFF_MS = 50; + +private final QuorumStateStore quorumStateStore; +private final Random random; + +final KafkaRaftClient client; +final Metrics metrics; +final MockLog log; +final MockNetworkChannel channel; +final MockTime time; +final Set voters; + +public static final class Builder { +private final QuorumStateStore quorumStateStore = new MockQuorumStateStore(); +private final Random random = Mockito.spy(new Random(1)); +private final MockLog log = new MockLog(METADATA_PARTITION); +private final Set voters; + +Builder(Set voters) { +this.voters = voters; +} + +Builder withElectedLeader(int epoch, int leaderId) throws IOException { + quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, leaderId, voters)); +return this; +}
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510438318 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { -updateHighWatermark(state, currentTimeMs); +onUpdateLeaderHighWatermark(state, currentTimeMs); } -LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); -fetchPurgatory.maybeComplete(endOffset, currentTimeMs); +fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } -private void updateHighWatermark( -EpochState state, +private void onUpdateLeaderHighWatermark( +LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { -logger.debug("High watermark updated to {}", highWatermark); +logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - -LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); -appendPurgatory.maybeComplete(offset, currentTimeMs); -fetchPurgatory.maybeComplete(offset, currentTimeMs); +appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); +maybeFireHandleCommit(highWatermark.offset); }); } -@Override -public LeaderAndEpoch currentLeaderAndEpoch() { -return quorum.leaderAndEpoch(); +private void maybeFireHandleCommit(long highWatermark) { +maybeFireHandleCommit(listenerContexts, highWatermark); +} + +private void maybeFireHandleCommit(List listenerContexts, long highWatermark) { +// TODO: When there are multiple listeners, we can cache reads to save some work +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +return; Review comment: Good catch. I think that I forgot to update this when moving from a single listener to multiple listeners. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510437477 ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset) throws IOException { if (!candidateState.isVoteGranted()) throw new IllegalStateException("Cannot become leader without majority votes granted"); +// Note that the leader does not retain the high watermark that was known +// in the previous state. The purpose of this is to protect the monotonicity Review comment: It was intended to refer to the behavior of not retaining the high watermark from the previous sentence. I will attempt to clarify. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510433916 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1757,35 +1809,86 @@ public void complete() { } } -private static class UnwrittenAppend { -private final Records records; -private final long createTimeMs; -private final long requestTimeoutMs; -private final AckMode ackMode; -private final CompletableFuture future; +private final class ListenerContext implements CloseListener> { +private final RaftClient.Listener listener; +private BatchReader lastSent = null; +private long lastAckedOffset = 0; +private int claimedEpoch = 0; + +private ListenerContext(Listener listener) { +this.listener = listener; +} + +/** + * Get the last acked offset, which is one greater than the offset of the + * last record which was acked by the state machine. + */ +public synchronized long lastAckedOffset() { +return lastAckedOffset; +} + +/** + * Get the next expected offset, which might be larger than the last acked + * offset if there are inflight batches which have not been acked yet. + * Note that when fetching from disk, we may not know the last offset of + * inflight data until it has been processed by the state machine. In this case, + * we delay sending additional data until the state machine has read to the + * end and the last offset is determined. Review comment: When catching up from the log, yes. However, I have implemented an optimization for writes from the leader. We save the original batch in memory so that it can be sent back to the state machine after the write is committed. In this case, we know the last offset of the batch, so we can have multiple inflight batches sent to the controller. This is nice because it means the elected controller will not have to read from disk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510432303 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -16,57 +16,75 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.record.Records; - import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; -public interface RaftClient { +public interface RaftClient { + +interface Listener { +/** + * Callback which is invoked for all records committed to the log. + * It is the responsibility of the caller to invoke {@link BatchReader#close()} + * after consuming the reader. + * + * Note that there is not a one-to-one correspondence between writes through + * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation + * is free to batch together the records from multiple append calls provided + * that batch boundaries are respected. This means that each batch specified + * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of + * a batch provided by the {@link BatchReader}. + * + * @param reader reader instance which must be iterated + */ +void handleCommit(BatchReader reader); + +/** + * Invoked after this node has become a leader. This is only called after + * all commits up to the start of the leader's epoch have been sent to + * {@link #handleCommit(BatchReader)}. + * + * After becoming a leader, the client is eligible to write to the log + * using {@link #scheduleAppend(int, List)}. + * + * @param epoch the claimed leader epoch + */ +default void handleClaim(int epoch) {} + +/** + * Invoked after a leader has stepped down. This callback may or may not + * fire before the next leader has been elected. + */ +default void handleResign() {} +} /** - * Initialize the client. This should only be called once and it must be - * called before any of the other APIs can be invoked. + * Initialize the client. This should only be called once on startup. * * @throws IOException For any IO errors during initialization */ void initialize() throws IOException; /** - * Append a new entry to the log. The client must be in the leader state to - * accept an append: it is up to the state machine implementation - * to ensure this using {@link #currentLeaderAndEpoch()}. - * - * TODO: One improvement we can make here is to allow the caller to specify - * the current leader epoch in the record set. That would ensure that each - * leader change must be "observed" by the state machine before new appends - * are accepted. - * - * @param records The records to append to the log - * @param timeoutMs Maximum time to wait for the append to complete - * @return A future containing the last offset and epoch of the appended records (if successful) - */ -CompletableFuture append(Records records, AckMode ackMode, long timeoutMs); - -/** - * Read a set of records from the log. Note that it is the responsibility of the state machine - * to filter control records added by the Raft client itself. - * - * If the fetch offset is no longer valid, then the future will be completed exceptionally - * with a {@link LogTruncationException}. + * Register a listener to get commit/leader notifications. * - * @param position The position to fetch from - * @param isolation The isolation level to apply to the read - * @param maxWaitTimeMs The maximum time to wait for new data to become available before completion - * @return The record set, which may be empty if fetching from the end of the log + * @param listener the listener */ -CompletableFuture read(OffsetAndEpoch position, Isolation isolation, long maxWaitTimeMs); +void register(Listener listener); /** - * Get the current leader (if known) and the current epoch. + * Append a list of records to the log. The write will be scheduled for some time + * in the future. There is no guarantee that appended records will be written to + * the log and eventually committed. However, it is guaranteed that if any of the + * records become committed, then all of them will be. * - * @return Current leader and epoch information + * @param epoch the current leader epoch + * @param records the list of records to append + * @return the offset within the current epoch that the log entries will be appended, + * or null if the leader was unable to accept the write (e.g. due to memory + * being reached). */ -LeaderAndEpoch
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510426714 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -16,57 +16,75 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.record.Records; - import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; -public interface RaftClient { +public interface RaftClient { + +interface Listener { +/** + * Callback which is invoked for all records committed to the log. + * It is the responsibility of the caller to invoke {@link BatchReader#close()} + * after consuming the reader. + * + * Note that there is not a one-to-one correspondence between writes through + * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation + * is free to batch together the records from multiple append calls provided + * that batch boundaries are respected. This means that each batch specified + * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of + * a batch provided by the {@link BatchReader}. + * + * @param reader reader instance which must be iterated + */ +void handleCommit(BatchReader reader); + +/** + * Invoked after this node has become a leader. This is only called after + * all commits up to the start of the leader's epoch have been sent to + * {@link #handleCommit(BatchReader)}. + * + * After becoming a leader, the client is eligible to write to the log + * using {@link #scheduleAppend(int, List)}. + * + * @param epoch the claimed leader epoch + */ +default void handleClaim(int epoch) {} + +/** + * Invoked after a leader has stepped down. This callback may or may not + * fire before the next leader has been elected. + */ +default void handleResign() {} +} /** - * Initialize the client. This should only be called once and it must be - * called before any of the other APIs can be invoked. + * Initialize the client. This should only be called once on startup. * * @throws IOException For any IO errors during initialization */ void initialize() throws IOException; /** - * Append a new entry to the log. The client must be in the leader state to - * accept an append: it is up to the state machine implementation - * to ensure this using {@link #currentLeaderAndEpoch()}. - * - * TODO: One improvement we can make here is to allow the caller to specify - * the current leader epoch in the record set. That would ensure that each - * leader change must be "observed" by the state machine before new appends - * are accepted. - * - * @param records The records to append to the log - * @param timeoutMs Maximum time to wait for the append to complete - * @return A future containing the last offset and epoch of the appended records (if successful) - */ -CompletableFuture append(Records records, AckMode ackMode, long timeoutMs); - -/** - * Read a set of records from the log. Note that it is the responsibility of the state machine - * to filter control records added by the Raft client itself. - * - * If the fetch offset is no longer valid, then the future will be completed exceptionally - * with a {@link LogTruncationException}. + * Register a listener to get commit/leader notifications. * - * @param position The position to fetch from - * @param isolation The isolation level to apply to the read - * @param maxWaitTimeMs The maximum time to wait for new data to become available before completion - * @return The record set, which may be empty if fetching from the end of the log + * @param listener the listener */ -CompletableFuture read(OffsetAndEpoch position, Isolation isolation, long maxWaitTimeMs); +void register(Listener listener); /** - * Get the current leader (if known) and the current epoch. + * Append a list of records to the log. The write will be scheduled for some time + * in the future. There is no guarantee that appended records will be written to + * the log and eventually committed. However, it is guaranteed that if any of the + * records become committed, then all of them will be. * - * @return Current leader and epoch information + * @param epoch the current leader epoch + * @param records the list of records to append + * @return the offset within the current epoch that the log entries will be appended, + * or null if the leader was unable to accept the write (e.g. due to memory + * being reached). Review comment: Agreed. I added this here:
[GitHub] [kafka] ijuma commented on pull request #9469: MINOR: replace FetchRequest.TopicAndPartitionData by Map.Entry
ijuma commented on pull request #9469: URL: https://github.com/apache/kafka/pull/9469#issuecomment-714728901 Thanks for the PR. Is this an improvement? It seems to make the code harder to read. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10634) LeaderChangeMessage should include the leader as one of the voters
Jose Armando Garcia Sancio created KAFKA-10634: -- Summary: LeaderChangeMessage should include the leader as one of the voters Key: KAFKA-10634 URL: https://issues.apache.org/jira/browse/KAFKA-10634 Project: Kafka Issue Type: Sub-task Reporter: Jose Armando Garcia Sancio When a leader is elect the leader writes a `LeaderChangeMessage` to the replicated log. This message is defined in KIP-595 as: ``` {{{}} {{ }}{{"type"}}{{: }}{{"data"}}{{,}} {{ }}{{"name"}}{{: }}{{"LeaderChangeMessage"}}{{,}} {{ }}{{"validVersions"}}{{: }}{{"0"}}{{,}} {{ }}{{"flexibleVersions"}}{{: }}{{"0+"}}{{,}} {{ }}{{"fields"}}{{: [}} {{ }}{{{}}{{"name"}}{{: }}{{"LeaderId"}}{{, }}{{"type"}}{{: }}{{"int32"}}{{, }}{{"versions"}}{{: }}{{"0+"}}{{,}} {{ }}{{"about"}}{{: }}{{"The ID of the newly elected leader"}}{{},}} {{ }}{{{}}{{"name"}}{{: }}{{"VotedIds"}}{{, }}{{"type"}}{{: }}{{"[]int32"}}{{, }}{{"versions"}}{{: }}{{"0+"}}{{,}} {{ }}{{"about"}}{{: }}{{"The IDs of the voters who voted for the current leader"}}{{},}} {{ }}{{]}} {{}}} ``` The current implementation doesn't include the LeaderId in the set of VoterIds. In the protocol it is guarantee that the leader must have voted for itself. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r510421964 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -90,470 +76,480 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class KafkaRaftClientTest { -private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0); - -private final int localId = 0; -private final int electionTimeoutMs = 1; -private final int electionBackoffMaxMs = 100; -private final int fetchTimeoutMs = 5; // fetch timeout is usually larger than election timeout -private final int retryBackoffMs = 50; -private final int requestTimeoutMs = 5000; -private final int fetchMaxWaitMs = 0; - -private final MockTime time = new MockTime(); -private final MockLog log = new MockLog(METADATA_PARTITION); -private final MockNetworkChannel channel = new MockNetworkChannel(); -private final Random random = Mockito.spy(new Random(1)); -private final QuorumStateStore quorumStateStore = new MockQuorumStateStore(); - -@AfterEach -public void cleanUp() throws IOException { -quorumStateStore.clear(); -} - -private InetSocketAddress mockAddress(int id) { -return new InetSocketAddress("localhost", 9990 + id); -} - -private KafkaRaftClient buildClient(Set voters) throws IOException { -return buildClient(voters, new Metrics(time)); -} - -private KafkaRaftClient buildClient(Set voters, Metrics metrics) throws IOException { -LogContext logContext = new LogContext(); -QuorumState quorum = new QuorumState(localId, voters, electionTimeoutMs, fetchTimeoutMs, -quorumStateStore, time, logContext, random); - -Map voterAddresses = voters.stream().collect(Collectors.toMap( -Function.identity(), -this::mockAddress -)); - -KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, time, metrics, -new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), voterAddresses, -electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, fetchMaxWaitMs, logContext, random); - -client.initialize(); - -return client; -} - @Test public void testInitializeSingleMemberQuorum() throws IOException { -buildClient(Collections.singleton(localId)); -assertEquals(ElectionState.withElectedLeader(1, localId, Collections.singleton(localId)), -quorumStateStore.readElectionState()); +RaftClientTestContext context = RaftClientTestContext.build(Collections.singleton(LOCAL_ID)); +assertEquals( +ElectionState.withElectedLeader(1, LOCAL_ID, Collections.singleton(LOCAL_ID)), +context.quorumStateStore.readElectionState() +); } @Test public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() throws Exception { // Start off as leader. We should still bump the epoch after initialization int initialEpoch = 2; -Set voters = Collections.singleton(localId); - quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch, localId, voters)); - -KafkaRaftClient client = buildClient(voters); -assertEquals(1L, log.endOffset().offset); -assertEquals(initialEpoch + 1, log.lastFetchedEpoch()); -assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1), -client.currentLeaderAndEpoch()); -assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, localId, voters), -quorumStateStore.readElectionState()); +Set voters = Collections.singleton(LOCAL_ID); +RaftClientTestContext context = new RaftClientTestContext.Builder() +.updateQuorumStateStore(quorumStateStore -> { +assertDoesNotThrow(() -> { +quorumStateStore.writeElectionState( +ElectionState.withElectedLeader(initialEpoch, LOCAL_ID, voters) +); +}); +}) +.build(voters); + +assertEquals(1L, context.log.endOffset().offset); +assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch()); +assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch + 1), +context.client.currentLeaderAndEpoch()); +assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, LOCAL_ID, voters), +context.quorumStateStore.readElectionState()); } @Test public void testInitializeAsLeaderFromStateStore() throws Exception { -Set voters = Utils.mkSet(localId, 1); +Set voters = Utils.mkSet(LOCAL_ID, 1); int epoch = 2; -Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs); -
[jira] [Commented] (KAFKA-10633) Constant probing rebalances in Streams 2.6
[ https://issues.apache.org/jira/browse/KAFKA-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219307#comment-17219307 ] Bradley Peterson commented on KAFKA-10633: -- /cc [~vvcephei] > Constant probing rebalances in Streams 2.6 > -- > > Key: KAFKA-10633 > URL: https://issues.apache.org/jira/browse/KAFKA-10633 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Bradley Peterson >Priority: Major > Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 > 46.409Z.csv > > > We are seeing a few issues with the new rebalancing behavior in Streams 2.6. > This ticket is for constant probing rebalances on one StreamThread, but I'll > mention the other issues, as they may be related. > First, when we redeploy the application we see tasks being moved, even though > the task assignment was stable before redeploying. We would expect to see > tasks assigned back to the same instances and no movement. The application is > in EC2, with persistent EBS volumes, and we use static group membership to > avoid rebalancing. To redeploy the app we terminate all EC2 instances. The > new instances will reattach the EBS volumes and use the same group member id. > After redeploying, we sometimes see the group leader go into a tight probing > rebalance loop. This doesn't happen immediately, it could be several hours > later. Because the redeploy caused task movement, we see expected probing > rebalances every 10 minutes. But, then one thread will go into a tight loop > logging messages like "Triggering the followup rebalance scheduled for > 1603323868771 ms.", handling the partition assignment (which doesn't change), > then "Requested to schedule probing rebalance for 1603323868771 ms." This > repeats several times a second until the app is restarted again. I'll attach > a log export from one such incident. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
jsancio commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510385486 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { -updateHighWatermark(state, currentTimeMs); +onUpdateLeaderHighWatermark(state, currentTimeMs); } -LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); -fetchPurgatory.maybeComplete(endOffset, currentTimeMs); +fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } -private void updateHighWatermark( -EpochState state, +private void onUpdateLeaderHighWatermark( +LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { -logger.debug("High watermark updated to {}", highWatermark); +logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - -LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); -appendPurgatory.maybeComplete(offset, currentTimeMs); -fetchPurgatory.maybeComplete(offset, currentTimeMs); +appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); +maybeFireHandleCommit(highWatermark.offset); }); } -@Override -public LeaderAndEpoch currentLeaderAndEpoch() { -return quorum.leaderAndEpoch(); +private void maybeFireHandleCommit(long highWatermark) { +maybeFireHandleCommit(listenerContexts, highWatermark); +} + +private void maybeFireHandleCommit(List listenerContexts, long highWatermark) { +// TODO: When there are multiple listeners, we can cache reads to save some work Review comment: I would remove this comment. We ca file an issue and fix it if this becomes a performance issue. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { -updateHighWatermark(state, currentTimeMs); +onUpdateLeaderHighWatermark(state, currentTimeMs); } -LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); -fetchPurgatory.maybeComplete(endOffset, currentTimeMs); +fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } -private void updateHighWatermark( -EpochState state, +private void onUpdateLeaderHighWatermark( +LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { -logger.debug("High watermark updated to {}", highWatermark); +logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - -LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); -appendPurgatory.maybeComplete(offset, currentTimeMs); -fetchPurgatory.maybeComplete(offset, currentTimeMs); +appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); +maybeFireHandleCommit(highWatermark.offset); }); } -@Override -public LeaderAndEpoch currentLeaderAndEpoch() { -return quorum.leaderAndEpoch(); +private void maybeFireHandleCommit(long highWatermark) { +maybeFireHandleCommit(listenerContexts, highWatermark); +} + +private void maybeFireHandleCommit(List listenerContexts, long highWatermark) { +// TODO: When there are multiple listeners, we can cache reads to save some work +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +return; Review comment: Why a `return`? Did you mean to use `continue`? If this is suppose to be a `continue` then maybe we can `Optional.ifPresent`. Same comment for one of the other overloaded `maybeFireHandleCommit`. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -975,12 +1029,9 @@ private boolean handleFetchResponse( log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> { logger.info("Truncated to offset {} from Fetch response from leader {}",
[GitHub] [kafka] jherico commented on a change in pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed
jherico commented on a change in pull request #8204: URL: https://github.com/apache/kafka/pull/8204#discussion_r510410362 ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -483,6 +483,7 @@ public void logUnused() { resolvedOriginals.putAll(result.data()); } } +providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); Review comment: if an exception occurs between line 477 and line 486, then the close function won't be called. The solution would be to either create a type for `providers` that extends `Map` and implements Autoclosable, or to simply put an explicit `try`/`finally` block here to ensure that the close function is called in every case. That also implies that `instantiateConfigProviders` should be modified so that if an exception is thrown from inside it, any previously opened `ConfigProvider` instances are closed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit
vvcephei commented on pull request #9479: URL: https://github.com/apache/kafka/pull/9479#issuecomment-714715589 Cherry-picked to 2.7 (cc @bbejeck ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit
vvcephei merged pull request #9479: URL: https://github.com/apache/kafka/pull/9479 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10633) Constant probing rebalances in Streams 2.6
Bradley Peterson created KAFKA-10633: Summary: Constant probing rebalances in Streams 2.6 Key: KAFKA-10633 URL: https://issues.apache.org/jira/browse/KAFKA-10633 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Bradley Peterson Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 46.409Z.csv We are seeing a few issues with the new rebalancing behavior in Streams 2.6. This ticket is for constant probing rebalances on one StreamThread, but I'll mention the other issues, as they may be related. First, when we redeploy the application we see tasks being moved, even though the task assignment was stable before redeploying. We would expect to see tasks assigned back to the same instances and no movement. The application is in EC2, with persistent EBS volumes, and we use static group membership to avoid rebalancing. To redeploy the app we terminate all EC2 instances. The new instances will reattach the EBS volumes and use the same group member id. After redeploying, we sometimes see the group leader go into a tight probing rebalance loop. This doesn't happen immediately, it could be several hours later. Because the redeploy caused task movement, we see expected probing rebalances every 10 minutes. But, then one thread will go into a tight loop logging messages like "Triggering the followup rebalance scheduled for 1603323868771 ms.", handling the partition assignment (which doesn't change), then "Requested to schedule probing rebalance for 1603323868771 ms." This repeats several times a second until the app is restarted again. I'll attach a log export from one such incident. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit
vvcephei commented on pull request #9479: URL: https://github.com/apache/kafka/pull/9479#issuecomment-714708637 LGTM! Thanks, @cadonna . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
hachikuji commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r510387310 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.BeginQuorumEpochRequestData; +import org.apache.kafka.common.message.BeginQuorumEpochResponseData; +import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState; +import org.apache.kafka.common.message.DescribeQuorumResponseData; +import org.apache.kafka.common.message.EndQuorumEpochRequestData; +import org.apache.kafka.common.message.EndQuorumEpochResponseData; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.LeaderChangeMessage.Voter; +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.VoteRequestData; +import org.apache.kafka.common.message.VoteResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.ControlRecordUtils; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.BeginQuorumEpochResponse; +import org.apache.kafka.common.requests.DescribeQuorumResponse; +import org.apache.kafka.common.requests.VoteResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.mockito.Mockito; +import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +final class RaftClientTestContext { +private static final int FETCH_MAX_WAIT_MS = 0; + +static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0); +static final int LOCAL_ID = 0; Review comment: I'm somewhat inclined to add the local id to the builder rather than making it constant. It makes the builder a bit more self-contained. On a similar note, it would be nice to push the other static config values into the builder as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
hachikuji commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r510387310 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.BeginQuorumEpochRequestData; +import org.apache.kafka.common.message.BeginQuorumEpochResponseData; +import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState; +import org.apache.kafka.common.message.DescribeQuorumResponseData; +import org.apache.kafka.common.message.EndQuorumEpochRequestData; +import org.apache.kafka.common.message.EndQuorumEpochResponseData; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.LeaderChangeMessage.Voter; +import org.apache.kafka.common.message.LeaderChangeMessage; +import org.apache.kafka.common.message.VoteRequestData; +import org.apache.kafka.common.message.VoteResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.ControlRecordUtils; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.BeginQuorumEpochResponse; +import org.apache.kafka.common.requests.DescribeQuorumResponse; +import org.apache.kafka.common.requests.VoteResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.mockito.Mockito; +import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +final class RaftClientTestContext { +private static final int FETCH_MAX_WAIT_MS = 0; + +static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0); +static final int LOCAL_ID = 0; Review comment: I'm somewhat inclined to add the local id to the builder rather than making it constant. It makes the builder a bit more self-contained. On a similar note, it would be nice to push these state config values into the builder as well. ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the
[jira] [Commented] (KAFKA-10592) system tests not running after python3 merge
[ https://issues.apache.org/jira/browse/KAFKA-10592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219284#comment-17219284 ] Nikolay Izhikov commented on KAFKA-10592: - Hello [~rndgstn], [~omkreddy] I fixed vagrant instructions and setup. Tested it locally and its works. Please, take a look at my changes https://github.com/apache/kafka/pull/9480 > system tests not running after python3 merge > > > Key: KAFKA-10592 > URL: https://issues.apache.org/jira/browse/KAFKA-10592 > Project: Kafka > Issue Type: Task > Components: system tests >Reporter: Ron Dagostino >Assignee: Nikolay Izhikov >Priority: Major > > We are seeing these errors on system tests due to the python3 merge: > {noformat} > [ERROR:2020-10-08 21:03:51,341]: Failed to import > kafkatest.sanity_checks.test_performance_services, which may indicate a > broken test that cannot be loaded: ImportError: No module named server > [ERROR:2020-10-08 21:03:51,351]: Failed to import > kafkatest.benchmarks.core.benchmark_test, which may indicate a broken test > that cannot be loaded: ImportError: No module named server > [ERROR:2020-10-08 21:03:51,501]: Failed to import > kafkatest.tests.core.throttling_test, which may indicate a broken test that > cannot be loaded: ImportError: No module named server > [ERROR:2020-10-08 21:03:51,598]: Failed to import > kafkatest.tests.client.quota_test, which may indicate a broken test that > cannot be loaded: ImportError: No module named server > {noformat} > I ran one of the system tests at the commit prior to the python3 merge > ([https://github.com/apache/kafka/commit/40a23cc0c2e1efa8632f59b093672221a3c03c36]) > and it ran fine: > [http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-10-09--001.1602255415--rondagostino--rtd_just_before_python3_merge--40a23cc0c/report.html] > I ran the exact same test file at the next commit – the python3 commit at > [https://github.com/apache/kafka/commit/4e65030e055104a7526e85b563a11890c61d6ddf] > – and it failed with the import error. The test results show no report.html > file because nothing ran: > [http://testing.confluent.io/confluent-kafka-system-test-results/?prefix=2020-10-09--001.1602251990--apache--trunk--7947c18b5/] > Not sure when this began because I do see these tests running successfully > during the development process as documented in > https://issues.apache.org/jira/browse/KAFKA-10402 (`tests run: 684` as > recently as 9/20 in that ticket). But the PR build (rebased onto latest > trunk) showed the above import errors and only 606 tests run. I assume those > 4 files mentioned include 78 tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers
mjsax commented on pull request #9000: URL: https://github.com/apache/kafka/pull/9000#issuecomment-714688801 The build did run, but failed with a compile error: Maybe something wrong with the rebase you did? ``` /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9000@2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java:209: error: cannot find symbol new KTableTransformValues<>(parent, new SingletonNoOpValueTransformer<>(), null).enableSendingOldValues(true); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6
jolshan commented on pull request #9471: URL: https://github.com/apache/kafka/pull/9471#issuecomment-714684198 @vvcephei Yeah. I was worried that might be happening. Good to check This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji opened a new pull request #9482: URL: https://github.com/apache/kafka/pull/9482 In #9418, we add a listener to the `RaftClient` interface. In that patch, we used it only to send commit notifications for writes from the leader. In this PR, we extend the `handleCommit` API to accept all committed data and we remove the pull-based `read` API. Additionally, we add two new callbacks to the listener interface in order to notify the state machine when the raft client has claimed or resigned leadership. Note this patch builds on top of #9418. Once merged, I will rebase this patch and remove draft status. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9999) Internal topic creation failure should be non-fatal and trigger explicit rebalance
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-. Resolution: Won't Fix > Internal topic creation failure should be non-fatal and trigger explicit > rebalance > --- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Bug > Components: admin, streams >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > We spotted a case in system test failure where the topic already exists but > the admin client still attempts to recreate it: > > {code:java} > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog. Topic is probably > marked for deletion (number of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number > of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-uwin-cnt-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic > SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number > of partitions is unknown). > Will retry to create this topic in 100 ms (to let broker finish async delete > operation first). > Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic > 'SmokeTest-cntByCnt-changelog' already exists. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics > [SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog, > SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made > ready with 5 retries left > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics > after 5 retries. This can happen if the Kafka cluster is temporary not > available. You can increase admin client config `retries` to be resilient > against this error. > (org.apache.kafka.streams.processor.internals.InternalTopicManager) > [2020-05-14 09:56:40,221] ERROR stream-thread > [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered > the following unexpected Kafka exception during processing, this usually > indicate Streams internal errors: > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.StreamsException: Could not create topics > after 5 retries. This can happen if the Kafka cluster is temporary not > available. You can increase admin client config `retries` to be resilient > against this error. > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) > at >
[jira] [Created] (KAFKA-10632) Raft client should push all committed data to listeners
Jason Gustafson created KAFKA-10632: --- Summary: Raft client should push all committed data to listeners Key: KAFKA-10632 URL: https://issues.apache.org/jira/browse/KAFKA-10632 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson We would like to move to a push model for sending committed data to the state machine. This simplifies the state machine a bit since it does not need to track its own position and poll for new data. It also allows the raft layer to ensure that all committed data up to the state of a leader epoch has been sent before allowing the state machine to begin sending writes. Finally, it allows us to take advantage of optimizations. For example, we can save the need to re-read writes that have been sent to the leader; instead, we can retain the data in memory and push it to the state machine after it becomes committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6
vvcephei commented on pull request #9471: URL: https://github.com/apache/kafka/pull/9471#issuecomment-714674113 On second thought, it looks like there's legitimately a test that hangs on 2.6 in java 11. I was looking before at the whole log, which I didn't realize shows all the builds and tests in one file. Here are the logs for just the java 11 build on both runs: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9471/1/execution/node/85/log/ https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9471/2/execution/node/86/log/ It clearly gets stuck about 35 minutes in. I'm attempting a local repro. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9481: KAFKA-10284: Disable static membership test in 2.4
vvcephei commented on pull request #9481: URL: https://github.com/apache/kafka/pull/9481#issuecomment-714670340 Hey @abbccdda , @feyman2016 , what do you think about this, as opposed to spending more time doing a backport for 2.4? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9481: KAFKA-10284: Disable static membership test in 2.4
vvcephei opened a new pull request #9481: URL: https://github.com/apache/kafka/pull/9481 This test was fixed in https://github.com/apache/kafka/pull/9270 for 2.5+, but the code in 2.4 is too different to have a clean backport. Rather than risk introducing a worse bug in 2.4, and also because the probability of a new bugfix release for 2.4 seems low, I'm proposing just to disable this test in the 2.4 branch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
vvcephei commented on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-714669017 Cherry-picked to 2.5 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10631) ProducerFencedException is not Handled on Offest Commit
[ https://issues.apache.org/jira/browse/KAFKA-10631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-10631: --- Assignee: Bruno Cadonna > ProducerFencedException is not Handled on Offest Commit > --- > > Key: KAFKA-10631 > URL: https://issues.apache.org/jira/browse/KAFKA-10631 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.7.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.7.0 > > > The transaction manager does currently not handle producer fenced errors > returned from a offset commit request. > We found this bug because we encountered the following exception in our soak > cluster: > {code:java} > org.apache.kafka.streams.errors.StreamsException: Error encountered trying to > commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task > [0_0]] > at > org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050) > at > org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > [2020-10-22T04:09:54+02:00] > (streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: > org.apache.kafka.common.KafkaException: Unexpected error in > TxnOffsetCommitResponse: There is a newer producer with the same > transactionalId which fences the current one. > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10631) ProducerFencedException is not Handled on Offest Commit
[ https://issues.apache.org/jira/browse/KAFKA-10631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-10631: Fix Version/s: 2.7.0 > ProducerFencedException is not Handled on Offest Commit > --- > > Key: KAFKA-10631 > URL: https://issues.apache.org/jira/browse/KAFKA-10631 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.7.0 >Reporter: Bruno Cadonna >Priority: Blocker > Fix For: 2.7.0 > > > The transaction manager does currently not handle producer fenced errors > returned from a offset commit request. > We found this bug because we encountered the following exception in our soak > cluster: > {code:java} > org.apache.kafka.streams.errors.StreamsException: Error encountered trying to > commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task > [0_0]] > at > org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050) > at > org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > [2020-10-22T04:09:54+02:00] > (streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: > org.apache.kafka.common.KafkaException: Unexpected error in > TxnOffsetCommitResponse: There is a newer producer with the same > transactionalId which fences the current one. > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9929) Support reverse iterator on WindowStore
[ https://issues.apache.org/jira/browse/KAFKA-9929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9929: --- Fix Version/s: 2.7.0 > Support reverse iterator on WindowStore > --- > > Key: KAFKA-9929 > URL: https://issues.apache.org/jira/browse/KAFKA-9929 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: needs-kip > Fix For: 2.7.0 > > > Currently, WindowStore fetch operations return an iterator sorted from > earliest to latest result: > ``` > * For each key, the iterator guarantees ordering of windows, starting from > the oldest/earliest > * available window to the newest/latest window. > ``` > > We have a use-case where traces are stored in a WindowStore > and use Kafka Streams to create a materialized view of traces. A query > request comes with a time range (e.g. now-1h, now) and want to return the > most recent results, i.e. fetch from this period of time, iterate and pattern > match latest/most recent traces, and if enough results, then reply without > moving further on the iterator. > Same store is used to search for previous traces. In this case, it search a > key for the last day, if found traces, we would also like to iterate from the > most recent. > RocksDb seems to support iterating backward and forward: > [https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound] > > For reference: This in some way extracts some bits from this previous issue: > https://issues.apache.org/jira/browse/KAFKA-4212: > > > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via > > segment dropping, but it stores multiple items per key, based on their > > timestamp. But this store can be repurposed as a cache by fetching the > > items in reverse chronological order and returning the first item found. > > Would like to know if there is any impediment on RocksDb or WindowStore to > support this. > Adding an argument to reverse in current fetch methods would be great: > ``` > WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD) > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests
jsancio commented on a change in pull request #9476: URL: https://github.com/apache/kafka/pull/9476#discussion_r510331249 ## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ## @@ -1536,67 +1522,70 @@ public void testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception int otherNodeId = 2; int epoch = 5; Set voters = Utils.mkSet(leaderId, otherNodeId); -KafkaRaftClient client = buildClient(voters); -discoverLeaderAsObserver(client, voters, leaderId, epoch); -pollUntilSend(client); -RaftRequest.Outbound fetchRequest1 = assertSentFetchRequest(); +RaftClientTestContext context = RaftClientTestContext.build(voters); + +context.discoverLeaderAsObserver(voters, leaderId, epoch); + +context.pollUntilSend(); +RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); assertEquals(leaderId, fetchRequest1.destinationId()); -assertFetchRequestData(fetchRequest1, epoch, 0L, 0); +RaftClientTestContext.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); -time.sleep(requestTimeoutMs); -pollUntilSend(client); +context.time.sleep(REQUEST_TIMEOUT_MS); +context.pollUntilSend(); // We should retry the Fetch against the other voter since the original // voter connection will be backing off. -RaftRequest.Outbound fetchRequest2 = assertSentFetchRequest(); +RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); assertNotEquals(leaderId, fetchRequest2.destinationId()); assertTrue(voters.contains(fetchRequest2.destinationId())); -assertFetchRequestData(fetchRequest2, epoch, 0L, 0); +RaftClientTestContext.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); -deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(), +context.deliverResponse(fetchRequest2.correlationId, fetchRequest2.destinationId(), fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH)); -client.poll(); +context.client.poll(); -assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), quorumStateStore.readElectionState()); +assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), context.quorumStateStore.readElectionState()); } @Test public void testLeaderGracefulShutdown() throws Exception { int otherNodeId = 1; -Set voters = Utils.mkSet(localId, otherNodeId); int epoch = 1; -KafkaRaftClient client = initializeAsLeader(voters, epoch); +Set voters = Utils.mkSet(LOCAL_ID, otherNodeId); + +RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(voters, epoch); // Now shutdown int shutdownTimeoutMs = 5000; -CompletableFuture shutdownFuture = client.shutdown(shutdownTimeoutMs); +CompletableFuture shutdownFuture = context.client.shutdown(shutdownTimeoutMs); // We should still be running until we have had a chance to send EndQuorumEpoch -assertTrue(client.isShuttingDown()); -assertTrue(client.isRunning()); +assertTrue(context.client.isShuttingDown()); Review comment: I thought about this last night and hack some solutions. I wasn't very pleased with the result. Let's explore this improvement in a future PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thake commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values
thake commented on pull request #9467: URL: https://github.com/apache/kafka/pull/9467#issuecomment-714640590 Ready for review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10631) ProducerFencedException is not Handled on Offest Commit
[ https://issues.apache.org/jira/browse/KAFKA-10631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-10631: -- Description: The transaction manager does currently not handle producer fenced errors returned from a offset commit request. We found this bug because we encountered the following exception in our soak cluster: {code:java} org.apache.kafka.streams.errors.StreamsException: Error encountered trying to commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task [0_0]] at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256) at org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050) at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [2020-10-22T04:09:54+02:00] (streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: org.apache.kafka.common.KafkaException: Unexpected error in TxnOffsetCommitResponse: There is a newer producer with the same transactionalId which fences the current one. at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748) {code} was: The transaction manager does currently not handle producer fenced errors returned from a offset commit request. We found this bug because we saw the following exception in our soak cluster: {code:java} org.apache.kafka.streams.errors.StreamsException: Error encountered trying to commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task [0_0]] at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256) at org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050) at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [2020-10-22T04:09:54+02:00] (streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: org.apache.kafka.common.KafkaException: Unexpected error in TxnOffsetCommitResponse: There is a newer producer with the same transactionalId which fences the current one. at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748) {code} > ProducerFencedException is not Handled on Offest Commit > --- > > Key: KAFKA-10631 > URL:
[jira] [Created] (KAFKA-10631) ProducerFencedException is not Handled on Offest Commit
Bruno Cadonna created KAFKA-10631: - Summary: ProducerFencedException is not Handled on Offest Commit Key: KAFKA-10631 URL: https://issues.apache.org/jira/browse/KAFKA-10631 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.7.0 Reporter: Bruno Cadonna The transaction manager does currently not handle producer fenced errors returned from a offset commit request. We found this bug because we saw the following exception in our soak cluster: {code:java} org.apache.kafka.streams.errors.StreamsException: Error encountered trying to commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task [0_0]] at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256) at org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050) at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [2020-10-22T04:09:54+02:00] (streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: org.apache.kafka.common.KafkaException: Unexpected error in TxnOffsetCommitResponse: There is a newer producer with the same transactionalId which fences the current one. at org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers
soarez commented on pull request #9000: URL: https://github.com/apache/kafka/pull/9000#issuecomment-714636553 @mjsax rebased and fixed an error. Can we try running the tests again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thake commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values
thake commented on pull request #9467: URL: https://github.com/apache/kafka/pull/9467#issuecomment-714632784 @vvcephei Sorry for the confusion, creating a new commit now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thake commented on pull request #9478: Fixed unit test mocks and incorrect required context serdes.
thake commented on pull request #9478: URL: https://github.com/apache/kafka/pull/9478#issuecomment-714630476 You were right. Somehow I thought that the tests were right and that keySerde and valueSerde are nullable. It would really help to have a null safe API description using annotations. Closing this PR because it addresses a non issue :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thake closed pull request #9478: Fixed unit test mocks and incorrect required context serdes.
thake closed pull request #9478: URL: https://github.com/apache/kafka/pull/9478 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apurvam commented on pull request #9479: Handle ProducerFencedException on offset commit
apurvam commented on pull request #9479: URL: https://github.com/apache/kafka/pull/9479#issuecomment-714628995 I can't believe this bug has been lurking for so long. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9472: MINOR: Add Jenkinsfile to 2.3
vvcephei commented on pull request #9472: URL: https://github.com/apache/kafka/pull/9472#issuecomment-714624598 Thanks for the reviews! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9472: MINOR: Add Jenkinsfile to 2.3
vvcephei merged pull request #9472: URL: https://github.com/apache/kafka/pull/9472 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9474: MINOR: Add Jenkinsfile to 2.2
vvcephei merged pull request #9474: URL: https://github.com/apache/kafka/pull/9474 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9475: MINOR: Add Jenkinsfile to 2.1
vvcephei commented on pull request #9475: URL: https://github.com/apache/kafka/pull/9475#issuecomment-714623937 Thanks for the reviews, all! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9475: MINOR: Add Jenkinsfile to 2.1
vvcephei merged pull request #9475: URL: https://github.com/apache/kafka/pull/9475 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9474: MINOR: Add Jenkinsfile to 2.2
vvcephei commented on pull request #9474: URL: https://github.com/apache/kafka/pull/9474#issuecomment-714624277 Thanks for the reviews! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov opened a new pull request #9480: [WIP] KAFKA-10592: Fix vagrant for a system tests with python3
nizhikov opened a new pull request #9480: URL: https://github.com/apache/kafka/pull/9480 Fix vagrant for a system tests with a python3. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joshuagrisham commented on pull request #9470: Add recursive support to Connect Cast and ReplaceField transforms, and support for casting complex types to either a native or JSON stri
joshuagrisham commented on pull request #9470: URL: https://github.com/apache/kafka/pull/9470#issuecomment-714608520 I saw that all of the checks have failed, but when I look in the log they are all related only to `checkstyle`.. if I get some time tomorrow I will try to address all of the code style / standards that failed the check and push a new commit just to remove any potential issue from that one. Stay tuned! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10406) Table data doesn't wrap around in fixed-width columns and gets hidden
[ https://issues.apache.org/jira/browse/KAFKA-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219137#comment-17219137 ] Mickael Maison commented on KAFKA-10406: [~vvcephei] I see you merged https://github.com/apache/kafka-site/pull/295. Does that PR completely resolve this JIRA or is there some work left? > Table data doesn't wrap around in fixed-width columns and gets hidden > - > > Key: KAFKA-10406 > URL: https://issues.apache.org/jira/browse/KAFKA-10406 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Sanjay Ravikumar >Priority: Trivial > Labels: documentation > Fix For: 2.6.0, 2.5.1 > > > The Kafka config-steams docs includes a table for "[Optional Configuration > Parameters|[https://kafka.apache.org/26/documentation/streams/developer-guide/config-streams#optional-configuration-parameters]];. > The first ("Parameter Name") and last ("Default Value") columns text doesn't > wrap around when the text length exceeds the width of the columns. Because of > this, some of the text gets hidden from the view. > This is the case with both 2.6.0 as well as 2.5.1 versions and may be others > as well. This needs to be handled similar to how the text in third > ("Description") column is handled where the text wraps around and all the > data is visible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna opened a new pull request #9479: Handle ProducerFencedException on offset commit
cadonna opened a new pull request #9479: URL: https://github.com/apache/kafka/pull/9479 The transaction manager does currently not handle producer fenced errors returned from a offset commit request. This PR adds the handling of the producer fenced errors. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10554) Perform follower truncation based on epoch offsets returned in Fetch response
[ https://issues.apache.org/jira/browse/KAFKA-10554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219111#comment-17219111 ] Bill Bejeck commented on KAFKA-10554: - Since this is not a blocker, and we've hit code freeze, I'm going to move the fix version of this ticket to 2.8 as part of the 2.7.0 release process. Should this be incorrect, please discuss on the [DISCUSS] Apache Kafka 2.7.0 release email thread. > Perform follower truncation based on epoch offsets returned in Fetch response > - > > Key: KAFKA-10554 > URL: https://issues.apache.org/jira/browse/KAFKA-10554 > Project: Kafka > Issue Type: Task > Components: replication >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > KAFKA-10435 updated fetch protocol for KIP-595 to return diverging epoch and > offset as part of fetch response. We can use this to truncate logs in > followers while processing fetch responses. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10554) Perform follower truncation based on epoch offsets returned in Fetch response
[ https://issues.apache.org/jira/browse/KAFKA-10554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-10554: Fix Version/s: (was: 2.7.0) 2.8.0 > Perform follower truncation based on epoch offsets returned in Fetch response > - > > Key: KAFKA-10554 > URL: https://issues.apache.org/jira/browse/KAFKA-10554 > Project: Kafka > Issue Type: Task > Components: replication >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.8.0 > > > KAFKA-10435 updated fetch protocol for KIP-595 to return diverging epoch and > offset as part of fetch response. We can use this to truncate logs in > followers while processing fetch responses. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7575) 'Error while writing to checkpoint file' Issue
[ https://issues.apache.org/jira/browse/KAFKA-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219108#comment-17219108 ] Ivan Gonzalez commented on KAFKA-7575: -- Hi, We are running a cluster of 3 Kafka servers. (Kafka version 2.3) today one of the nodes went down because this issue. Good news is that is the first time it happens in 2 years, however we have no way to prevent it. OS: Red Hat Enterprise Linux Server Release 7.5 > 'Error while writing to checkpoint file' Issue > -- > > Key: KAFKA-7575 > URL: https://issues.apache.org/jira/browse/KAFKA-7575 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.1.1 > Environment: Windows 10, Kafka 1.1.1 >Reporter: Dasun Nirmitha >Priority: Major > Attachments: Dry run error.rar > > > I'm currently testing a Java Kafka producer application coded to retrieve a > db value from a local mysql db and produce to a single topic. Locally I've > got a Zookeeper server and a Kafka single broker running. > My issue is I need to produce this from the Kafka producer each second, and > that works for around 2 hours until broker throws an 'Error while writing to > checkpoint file' and shuts down. Producing with a 1 minute interval works > with no issues but unfortunately I need the produce interval to be 1 second. > I have attached a rar containing screenshots of the Errors thrown from the > Broker and my application. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10284: - Fix Version/s: 2.6.1 > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: feyman >Priority: Critical > Labels: help-wanted > Fix For: 2.7.0, 2.6.1 > > Attachments: How to reproduce the issue in KAFKA-10284.md > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
vvcephei commented on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-714574329 Cherry-picked to 2.6 (cc @mimaison ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10201) Update codebase to use more inclusive terms
[ https://issues.apache.org/jira/browse/KAFKA-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-10201: Fix Version/s: (was: 2.7.0) 2.8.0 > Update codebase to use more inclusive terms > --- > > Key: KAFKA-10201 > URL: https://issues.apache.org/jira/browse/KAFKA-10201 > Project: Kafka > Issue Type: Improvement >Reporter: Xavier Léauté >Priority: Major > Fix For: 2.8.0 > > > see the corresponding KIP > https://cwiki.apache.org/confluence/display/KAFKA/KIP-629:+Use+racially+neutral+terms+in+our+codebase -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10201) Update codebase to use more inclusive terms
[ https://issues.apache.org/jira/browse/KAFKA-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219093#comment-17219093 ] Bill Bejeck commented on KAFKA-10201: - [~xvrl] , Since we've hit code freeze on 10/21, I'm going to set the fix version to 2.8 as part of the 2.7.0 release process. > Update codebase to use more inclusive terms > --- > > Key: KAFKA-10201 > URL: https://issues.apache.org/jira/browse/KAFKA-10201 > Project: Kafka > Issue Type: Improvement >Reporter: Xavier Léauté >Priority: Major > Fix For: 2.7.0 > > > see the corresponding KIP > https://cwiki.apache.org/confluence/display/KAFKA/KIP-629:+Use+racially+neutral+terms+in+our+codebase -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6
vvcephei commented on pull request #9471: URL: https://github.com/apache/kafka/pull/9471#issuecomment-714569139 The tests for 8 and 14 passed, but 11 timed out. Weirdly, the build log (https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9471/1/console) actually shows that the tests passed after just under 2 hours, and then it looks like it got hung up "recording the results". Re-running now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10284. -- Resolution: Fixed > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: feyman >Priority: Critical > Labels: help-wanted > Fix For: 2.7.0 > > Attachments: How to reproduce the issue in KAFKA-10284.md > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #9478: Fixed unit test mocks and incorrect required context serdes.
vvcephei commented on pull request #9478: URL: https://github.com/apache/kafka/pull/9478#issuecomment-714565433 Sorry, this is getting complicated. I've just realized that this PR is specifically addressing some internal test failures that came up in the 2.6 port: https://github.com/apache/kafka/pull/9467 . If that's the sole motivation, then I think we can close this PR and instead just fix the tests in https://github.com/apache/kafka/pull/9467 What do you think, @thake ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values
vvcephei commented on pull request #9467: URL: https://github.com/apache/kafka/pull/9467#issuecomment-714564666 Ahh, I already commented on https://github.com/apache/kafka/pull/9478, but I think now I see the motivation for that PR. If it was indeed just because there are internal tests that need new expectations now, we should just add the expectations instead of twisting the "main" code into handling an unexpected condition. By the way, I'm sure it's not obvious, but the fundamental reason for my taking this position is that in production code, the context serdes can never be null. There is a "default default" of ByteArraySerde even if you don't configure a "deafult" serde at all. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9478: Fixed unit test mocks and incorrect required context serdes.
vvcephei commented on pull request #9478: URL: https://github.com/apache/kafka/pull/9478#issuecomment-714562224 Thanks for this, @thake! I'm wondering if this is the right direction to go here. In a real Streams application, as well as in MockProcessorContext and TopologyTestDriver, the context serdes can never be null, so it seems to be a problem only for EasyMock tests. It's not that "easy" to begin with to set up an EasyMock of a context for testing Streams components because the set of required interactions is both non-trivial and an internal detail. That's why we offer the MockProcessorContext, which provides default behavior that should be suitable for most tests and still lets you capture and assert the things you'd need to assert. Have I misunderstood the problem here? Thanks, -John This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
vvcephei commented on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-714559804 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10284: - Fix Version/s: 2.7.0 > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: feyman >Priority: Critical > Labels: help-wanted > Fix For: 2.7.0 > > Attachments: How to reproduce the issue in KAFKA-10284.md > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
vvcephei merged pull request #9270: URL: https://github.com/apache/kafka/pull/9270 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
vvcephei commented on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-714551566 The test failure was unrelated: Build / JDK 15 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters
vvcephei commented on pull request #9477: URL: https://github.com/apache/kafka/pull/9477#issuecomment-714550389 Ok, here are the follow-up tickets I filed: * https://issues.apache.org/jira/browse/KAFKA-10628 * https://issues.apache.org/jira/browse/KAFKA-10629 * https://issues.apache.org/jira/browse/KAFKA-10630 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10630) State Directory config could be improved
John Roesler created KAFKA-10630: Summary: State Directory config could be improved Key: KAFKA-10630 URL: https://issues.apache.org/jira/browse/KAFKA-10630 Project: Kafka Issue Type: Task Components: streams Reporter: John Roesler During [https://github.com/apache/kafka/pull/9477,] I noticed that many tests wind up providing a state directory config purely to ensure a unique temp directory for the test. Since TopologyTestDriver and MockProcessorContext tests are typically unit tests, it would be more convenient to initialize those components with their own unique temp state directory, following the universal pattern from such tests: {code:java} props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); {code} Note that this literal setting is not ideal, since it actually creates a directory regardless of whether the application needs one. Instead, we should create a new TestUtil method to lazily generate a temp directory _name_ and then register a shutdown handler to delete it if it exists. Then, Streams would only create the directory if it actually needs persistence. Also, the default value for that config is not platform independent. It is simply: {color:#067d17}"/tmp/kafka-streams"{color}. Perhaps instead we should set the default to something like "unset" or "" or "none". Then, instead of reading the property directly, when Streams actually needs the state directory, it could log a warning that the state directory config is not set and call the platform-independent Java api for creating a temporary directory. -- This message was sent by Atlassian Jira (v8.3.4#803005)