[GitHub] [kafka] kkonstantine commented on a change in pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed

2020-10-22 Thread GitBox


kkonstantine commented on a change in pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#discussion_r510631763



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -483,6 +483,7 @@ public void logUnused() {
 resolvedOriginals.putAll(result.data());
 }
 }
+providers.values().forEach(x -> Utils.closeQuietly(x, "config 
provider"));

Review comment:
   Good observations @jherico . I believe the latter approach, of closing 
all the instantiated providers if an exception occurs in both cases, would be 
the most straightforward fix. 
   
   If you'd be interested in submitting a fix, that would be very welcome!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc opened a new pull request #9485: [WIP] KAKFA-10619: Producer will enable EOS by default

2020-10-22 Thread GitBox


d8tltanc opened a new pull request #9485:
URL: https://github.com/apache/kafka/pull/9485


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kotharironak edited a comment on pull request #8043: KAFKA-6793: Unnecessary warning log message

2020-10-22 Thread GitBox


kotharironak edited a comment on pull request #8043:
URL: https://github.com/apache/kafka/pull/8043#issuecomment-714917967


   we are also observing quite a few warnings - 
https://github.com/hypertrace/pinot/issues/26
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kotharironak commented on pull request #8043: KAFKA-6793: Unnecessary warning log message

2020-10-22 Thread GitBox


kotharironak commented on pull request #8043:
URL: https://github.com/apache/kafka/pull/8043#issuecomment-714917967


   we are also observing quite a few warnings - 
https://github.com/hypertrace/pinot/issues/26
   Could you pl. point to KIP link for this issue.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-22 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510568706



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
 info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+if (!request.header.apiKey.forwardable && 
request.envelopeContext.isDefined) {
+  throw new IllegalStateException("Given RPC " + request.header.apiKey + " 
does not support forwarding.")
+}
+  }
+
+  private def maybeForward(request: RequestChannel.Request,
+   handler: RequestChannel.Request => Unit): Unit = {
+if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+  sendErrorResponseMaybeThrottle(request, 
Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+} else if (request.envelopeContext.isDefined &&
+  (!request.context.fromPrivilegedListener ||
+  !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, 
CLUSTER, CLUSTER_NAME))
+) {
+  // If the designated forwarding request is not coming from a privileged 
listener, or
+  // it fails CLUSTER_ACTION permission, we would fail the authorization.
+  sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception())

Review comment:
   The question would be how the forwarding broker should do the error 
handling for auth & principal serde exceptions. To me we should get a vanilla 
error response with `UNKNOWN_SERVER_ERROR` and get back to the original client? 
Besides that, I think we could add a differentiation here to avoid passing the 
serde-type errors to the client.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 edited a comment on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-22 Thread GitBox


feyman2016 edited a comment on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714871862


   Thanks a lot for the review and merge @abbccdda @vvcephei!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-22 Thread GitBox


feyman2016 commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714871862


   Thanks a lot for the help @abbccdda @vvcephei!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
 @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
 val session = Session(context.principal, context.clientAddress)
+
 private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
 def header: RequestHeader = context.header
 def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-//most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-//some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-//to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+// most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
+// some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
+// to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
 if (!header.apiKey.requiresDelayedAllocation) {
   releaseBuffer()
 }
 
-def requestDesc(details: Boolean): String = s"$header -- 
${loggableRequest.toString(details)}"
+def buildResponse(abstractResponse: AbstractResponse,
+  error: Errors): Send = {
+  envelopeContext match {
+case Some(envelopeContext) =>
+  val envelopeResponse = new EnvelopeResponse(
+abstractResponse.throttleTimeMs(),

Review comment:
   Quotas are one aspect of this work that need more consideration. What we 
don't want is for the inter-broker channel to get affected by the individual 
client throttle, which is what will happen with the current patch. What I'd 
suggest for now is that we allow the broker to track client quotas and pass 
back the throttle value in the underlying response, but we set the envelope 
throttle time to 0 and ensure that the inter-broker channel does not get 
throttled. 
   
   For this, I think we we will need to change the logic in 
`KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still 
need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to 
`ClientQuotaManager.throttle`. When the response is received on the forwarding 
broker, we will need to apply the throttle, which I think the patch already 
handles.
   
   One challenging aspect is how this will affect quota metrics. Currently 
quota/throttling metrics are relatively simple because they are recorded 
separately by each broker. However, here the controller is the one that is 
tracking the throttling for the client across multiple inbound connections from 
multiple brokers. This means that the broker that is applying a throttle for a 
forwarded request may not have actually observed a quota violation. Other than 
causing some reporting confusion, I am not sure whether there are any other 
consequences to this.
   
   cc @apovzner @rajinisivaram 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510552124



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
 @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
 val session = Session(context.principal, context.clientAddress)
+
 private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
 def header: RequestHeader = context.header
 def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-//most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-//some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-//to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+// most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
+// some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
+// to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
 if (!header.apiKey.requiresDelayedAllocation) {
   releaseBuffer()
 }
 
-def requestDesc(details: Boolean): String = s"$header -- 
${loggableRequest.toString(details)}"
+def buildResponse(abstractResponse: AbstractResponse,
+  error: Errors): Send = {
+  envelopeContext match {
+case Some(envelopeContext) =>
+  val envelopeResponse = new EnvelopeResponse(
+abstractResponse.throttleTimeMs(),

Review comment:
   Quotas are one aspect of this work that need more consideration. What we 
don't want is for the inter-broker channel to get affected by the individual 
client throttle, which is what will happen with the current patch. What I'd 
suggest for now is that we allow the broker to track client quotas and pass 
back the throttle value in the underlying response, but we set the envelope 
throttle time to 0 and ensure that the channel does not get throttled. 
   
   For this, I think we we will need to change the logic in 
`KafkaApis.sendResponseMaybeThrottle`. If it is a forwarded request, we still 
need to check `maybeRecordAndGetThrottleTimeMs`, but we can skip the call to 
`ClientQuotaManager.throttle`. When the response is received on the forwarding 
broker, we will need to apply the throttle, which I think the patch already 
handles.
   
   One challenging aspect is how this will affect quota metrics. Currently 
quota/throttling metrics are relatively simple because they are recorded 
separately by each broker. However, here the controller is the one that is 
tracking the throttling for the client across multiple inbound connections from 
multiple brokers. This means that the broker that is applying a throttle for a 
forwarded request may not have actually observed a quota violation. Other than 
causing some reporting confusion, I am not sure whether there are any other 
consequences to this.
   
   cc @apovzner @rajinisivaram 

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -126,11 +125,44 @@ class KafkaApis(val requestChannel: RequestChannel,
 info("Shutdown complete.")
   }
 
+  private def checkForwarding(request: RequestChannel.Request): Unit = {
+if (!request.header.apiKey.forwardable && 
request.envelopeContext.isDefined) {
+  throw new IllegalStateException("Given RPC " + request.header.apiKey + " 
does not support forwarding.")
+}
+  }
+
+  private def maybeForward(request: RequestChannel.Request,
+   handler: RequestChannel.Request => Unit): Unit = {
+if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+  sendErrorResponseMaybeThrottle(request, 
Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+} else if (request.envelopeContext.isDefined &&
+  (!request.context.fromPrivilegedListener ||
+  !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, 
CLUSTER, CLUSTER_NAME))
+) {
+  // If the designated forwarding request is not coming from a privileged 
listener, or
+  // it fails CLUSTER_ACTION permission, we would fail the authorization.
+  sendErrorResponseMaybeThrottle(request, 
Errors.CLUSTER_AUTHORIZATION_FAILED.exception())

Review comment:
   One challenge we have here is that there are two levels of errors. The 
current patch seems to conflate the two, which makes it confusing. I think we 
need a structure which allows us to separate the errors possible at the 
envelope level and those possible at the request level. What I'm thinking is 
this:
   
   1. For cluster auth and principal serde errors, we should return the 
envelope error and null 

[GitHub] [kafka] dengziming commented on pull request #7862: KAFKA-9246:Update Heartbeat timeout when ConsumerCoordinator commit offset

2020-10-22 Thread GitBox


dengziming commented on pull request #7862:
URL: https://github.com/apache/kafka/pull/7862#issuecomment-714856288


   @abbccdda @hachikuji , Hi, PTAL, thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


hachikuji merged pull request #9476:
URL: https://github.com/apache/kafka/pull/9476


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


hachikuji commented on pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#issuecomment-714846028


   The build failure appears to be unrelated. 
   ```
   
   [2020-10-23T01:08:22.296Z] [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on 
project standalone-pom: A Maven project already exists in the directory 
/home/jenkins/workspace/Kafka_kafka-pr_PR-9476/streams/quickstart/test-streams-archetype/streams.examples
 -> [Help 1]
   [2020-10-23T01:08:22.296Z] [ERROR] 
   [2020-10-23T01:08:22.296Z] [ERROR] To see the full stack trace of the 
errors, re-run Maven with the -e switch.
   [2020-10-23T01:08:22.297Z] [ERROR] Re-run Maven using the -X switch to 
enable full debug logging.
   [2020-10-23T01:08:22.297Z] [ERROR] 
   [2020-10-23T01:08:22.297Z] [ERROR] For more information about the errors and 
possible solutions, please read the following articles:
   [2020-10-23T01:08:22.298Z] [ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
   ```
   
   I will merge to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9484: MINOR: Update raft/README.md and minor RaftConfig tweaks

2020-10-22 Thread GitBox


hachikuji merged pull request #9484:
URL: https://github.com/apache/kafka/pull/9484


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510536894



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1005,6 +1013,36 @@ private[kafka] class Processor(val id: Int,
 selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+   nowNanos: Long,
+   connectionId: String,
+   context: RequestContext,
+   principalSerde: 
Option[KafkaPrincipalSerde]) = {
+val envelopeRequest = 
context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+val originalHeader = RequestHeader.parse(envelopeRequest.requestData)
+// Leave the principal null here is ok since we will fail the request 
during Kafka API handling.
+val originalPrincipal = if (principalSerde.isDefined)
+  principalSerde.get.deserialize(envelopeRequest.principalData)
+else
+  null
+
+// The forwarding broker and the active controller should have the same 
DNS resolution, and we need
+// to have the original client address for authentication purpose.
+val originalClientAddress = 
InetAddress.getByName(envelopeRequest.clientHostName)

Review comment:
   I was thinking a little bit about this and trying to decide if the 
envelope request should have a more literal representation of the client ip 
address. The way it is working right now, it looks like the following:
   
   1) Use `Socket.getInetAddress` to populate `RequestContext.clientAddress`.
   2) Use `InetAddress.getHostName` to populate the `clientHostName` field in 
the envelope request. This will do a reverse dns lookup based on the IP address 
from 1).
   3) Now we send `clientHostName` over the wire. It gets unpacked here by 
doing a dns lookup to get to the `InetAddress` object.
   
   So it seems we should be skipping the dns translation and just using the IP 
address from 1). The `InetAddress` class gives us `getAddress` and 
`getHostAddress`. The first provides the raw byte representation of the ip 
address, while the latter provides a textual representation. I am thinking we 
should use `getAddress` and let this field be represented as bytes. What do you 
think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r510533897



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int,
 val context = new RequestContext(header, connectionId, 
channel.socketAddress,
   channel.principal, listenerName, securityProtocol,
   channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener)
-val req = new RequestChannel.Request(processor = id, context = 
context,
-  startTimeNanos = nowNanos, memoryPool, receive.payload, 
requestChannel.metrics)
+
+val principalSerde = 
Option(channel.principalSerde.orElse(null))

Review comment:
   You probably need the following:
   ```scala
   import scala.compat.java8.OptionConverters._
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510513954



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset < highWatermark) {
+LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+}
+}
+}
+
+private void maybeFireHandleCommit(long baseOffset, int epoch, List 
records) {
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset == baseOffset) {
+listenerContext.fireHandleCommit(baseOffset, epoch, records);
+}
+}
+}
+
+private void maybeFireHandleClaim(LeaderState state) {
+for (ListenerContext listenerContext : listenerContexts) {
+int leaderEpoch = state.epoch();
+
+// We can fire `handleClaim` as soon as the listener has caught
+// up to the start of the leader epoch. This guarantees that the
+// state machine has seen the full committed state before it 
becomes
+// leader and begins writing to the log.

Review comment:
   I guess let's keep this option in our back pocket for now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10636) Bypass log validation for writes to raft log

2020-10-22 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10636:
---

 Summary: Bypass log validation for writes to raft log
 Key: KAFKA-10636
 URL: https://issues.apache.org/jira/browse/KAFKA-10636
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson


The raft leader is responsible for creating the records written to the log 
(including assigning offsets and the epoch), so we can consider bypassing the 
validation done in `LogValidator`. This lets us skip potentially expensive 
decompression and the unnecessary recomputation of the CRC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2020-10-22 Thread Peeraya Maetasatidsuk (Jira)
Peeraya Maetasatidsuk created KAFKA-10635:
-

 Summary: Streams application fails with 
OutOfOrderSequenceException after rolling restarts of brokers
 Key: KAFKA-10635
 URL: https://issues.apache.org/jira/browse/KAFKA-10635
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.1
Reporter: Peeraya Maetasatidsuk


We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
rolling restart of the brokers after installing the new version. After the 
restarts we notice one of our streams app (client version 2.4.1) fails with 
OutOfOrderSequenceException:

 
{code:java}
ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
error. Record: a_record, destination topic: topic-name-Aggregation-repartition 
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
ERROR [2020-10-13 22:52:21,413] 
[org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
[topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
following error: org.apache.kafka.streams.errors.StreamsException: task [1_39] 
Abort sending since an error caught with a previous record (timestamp 
1602654659000) to topic topic-name-Aggregation-repartition due to 
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730)   
 at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716)   
 at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)   
 at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) 
   at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)  
  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) 
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
 at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
{code}
We see a corresponding error on the broker side:
{code:java}
[2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
processing append operation on partition topic-name-Aggregation-repartition-52  
(kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
 Out of order sequence number for producerId 2819098 at offset 1156041 in 
partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), -1 
(current end sequence number)
{code}
We are able to reproduce this many times and it happens regardless of whether 
the broker shutdown (at restart) is clean or unclean. However, when we rollback 
the broker version to 2.3.1 from 2.5.1 and perform similar rolling restarts, we 
don't see this error on the streams application at all. This is blocking us 
from upgrading our broker version. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-22 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r510500535



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTest.java
##
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorClient;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.utils.ThreadedConsumer;
+import org.apache.kafka.connect.mirror.utils.ThreadedProducer;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests MM2 replication and failover/failback logic.
+ *
+ * MM2 is configured with active/active replication between two Kafka 
clusters. Tests validate that
+ * records sent to either cluster arrive at the other cluster. Then, a 
consumer group is migrated from
+ * one cluster to the other and back. Tests validate that consumer offsets are 
translated and replicated
+ * between clusters during this failover and failback.
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsIntegrationTest extends 
MirrorConnectorsIntegrationBaseTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationTest.class);
+
+private static final List CONNECTOR_LIST = 
+Arrays.asList(MirrorSourceConnector.class, 
MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
+
+@Before
+public void setup() throws InterruptedException {

Review comment:
   in `MirrorConnectorsIntegrationSSLTest`, we put ssl config into 
`backupBrokerProps`, so the setup() of both `MirrorConnectorsIntegrationTest` 
and `MirrorConnectorsIntegrationSSLTest` will be slightly different





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-22 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r510499941



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import kafka.server.KafkaConfig$;
+
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests MM2 replication with SSL enabled at backup kafka cluster
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsIntegrationSSLTest extends 
MirrorConnectorsIntegrationBaseTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationSSLTest.class);
+
+private static final List CONNECTOR_LIST = 
+Arrays.asList(MirrorSourceConnector.class, 
MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
+
+@Before
+public void setup() throws InterruptedException {
+try {
+Map sslConfig = 
TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), 
"testCert");
+backupBrokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), 
"SSL://localhost:0");
+
backupBrokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), 
"SSL");
+backupBrokerProps.putAll(sslConfig);
+} catch (final Exception e) {
+throw new RuntimeException(e);
+}
+startClusters();
+}
+
+@After
+public void close() {

Review comment:
   if move to base class, will close() still be called at demolition stage 
of test?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510498433



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -278,13 +265,13 @@ public void 
testEndQuorumStartsNewElectionAfterBackoffIfReceivedFromVotedCandida
 .withVotedCandidate(epoch, otherNodeId)

Review comment:
   Makes sense. I implemented this suggestion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-22 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r510494608



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.experimental.categories.Category;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.connect.mirror.TestUtils.expectedRecords;
+import static 
org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import kafka.server.KafkaConfig$;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+//import org.junit.After;
+
+/**
+ * Common Test functions for MM2 integration tests
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsIntegrationBaseTest {
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
+
+protected static final int NUM_RECORDS_PER_PARTITION = 10;
+public static final int NUM_PARTITIONS = 10;
+protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
+protected static final int CHECKPOINT_DURATION_MS = 20_000;
+protected static final int RECORD_CONSUME_DURATION_MS = 20_000;
+protected static final int OFFSET_SYNC_DURATION_MS = 30_000;
+protected static final int NUM_WORKERS = 3;
+
+
+protected Map mm2Props;
+protected MirrorMakerConfig mm2Config; 
+protected EmbeddedConnectCluster primary;
+protected EmbeddedConnectCluster backup;
+
+private final AtomicBoolean exited = new AtomicBoolean(false);
+private Properties primaryBrokerProps = new Properties();
+protected Properties backupBrokerProps = new Properties();

Review comment:
   because `backupBrokerProps` is referred in 
`MirrorConnectorsIntegrationSSLTest` 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510488902



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -278,13 +265,13 @@ public void 
testEndQuorumStartsNewElectionAfterBackoffIfReceivedFromVotedCandida
 .withVotedCandidate(epoch, otherNodeId)

Review comment:
   The pattern I had in mind was a little different. I was thinking 
something like this:
   
   ```java
   int localId = 0;
   int otherNodeId = 1;
   int epoch = 2;
   Set voters = Utils.mkSet(localId, otherNodeId);
   
   RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
 .withVotedCandidate(epoch, otherNodeId)
 .build()
   ```
   
   Then we don't have the awkwardness of the partial reliance on the static 
`LOCAL_ID`. I like this better because the ids have to be explicitly declared 
in each test case, which makes it easier to follow.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-22 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r510486514



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import kafka.server.KafkaConfig$;
+
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests MM2 replication with SSL enabled at backup kafka cluster
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsIntegrationSSLTest extends 
MirrorConnectorsIntegrationBaseTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationSSLTest.class);
+
+private static final List CONNECTOR_LIST = 
+Arrays.asList(MirrorSourceConnector.class, 
MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
+
+@Before
+public void setup() throws InterruptedException {
+try {
+Map sslConfig = 
TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), 
"testCert");
+backupBrokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), 
"SSL://localhost:0");
+
backupBrokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), 
"SSL");
+backupBrokerProps.putAll(sslConfig);
+} catch (final Exception e) {
+throw new RuntimeException(e);

Review comment:
   referred to the example at 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java#L58





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9483: MINOR: Update docs to point to next release add notable features for 2.7

2020-10-22 Thread GitBox


bbejeck merged pull request #9483:
URL: https://github.com/apache/kafka/pull/9483


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9483: MINOR: Update docs to point to next release add notable features for 2.7

2020-10-22 Thread GitBox


bbejeck commented on pull request #9483:
URL: https://github.com/apache/kafka/pull/9483#issuecomment-714784672


   only html changes, so merging now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #9484: MINOR: Update raft/README.md and minor RaftConfig tweaks

2020-10-22 Thread GitBox


ijuma opened a new pull request #9484:
URL: https://github.com/apache/kafka/pull/9484


   * Replace quorum.bootstrap.servers and quorum.bootstrap.voters with
   quorum.voters.
   * Remove seemingly unused `verbose` config.
   * Use constant to avoid unnecessary repeated concatenation.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6

2020-10-22 Thread GitBox


vvcephei commented on pull request #9471:
URL: https://github.com/apache/kafka/pull/9471#issuecomment-714776458


   So far, I have not been able to reproduce it. I even re-ran the exact same 
command as Jenkins:
   
   ```
   [john@arcturus kafka]$ javac -version
   javac 11.0.8
   
   [john@arcturus kafka]$ ./gradlew -PscalaVersion=2.13 unitTest 
integrationTest --profile --no-daemon --continue 
-PtestLoggingEvents=started,passed,skipped,failed -PignoreFailures=true 
-PmaxParallelForks=2 -PmaxTestRetries=1 -PmaxTestRetryFailures=5
   ...
   BUILD SUCCESSFUL in 1h 20m 15s
   144 actionable tasks: 59 executed, 85 up-to-date
   
   See the profiling report at: 
file:///home/confluent/kafka/build/reports/profile/profile-2020-10-22-14-21-43.html
   A fine-grained performance profile is available: use the --scan option.
   ```
   
   Not sure where to go from here...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck opened a new pull request #9483: MINOR: Update docs to point to next release add notable features for 2.7

2020-10-22 Thread GitBox


bbejeck opened a new pull request #9483:
URL: https://github.com/apache/kafka/pull/9483


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


jsancio commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510469742



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset < highWatermark) {
+LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+}
+}
+}
+
+private void maybeFireHandleCommit(long baseOffset, int epoch, List 
records) {
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset == baseOffset) {
+listenerContext.fireHandleCommit(baseOffset, epoch, records);
+}
+}
+}
+
+private void maybeFireHandleClaim(LeaderState state) {
+for (ListenerContext listenerContext : listenerContexts) {
+int leaderEpoch = state.epoch();
+
+// We can fire `handleClaim` as soon as the listener has caught
+// up to the start of the leader epoch. This guarantees that the
+// state machine has seen the full committed state before it 
becomes
+// leader and begins writing to the log.

Review comment:
   Yeah. I was thinking of the same thing, "hold the requests in 
purgatory". But like you said, maybe this optimization is not worth the added 
complexity.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510455713



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset < highWatermark) {
+LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+}
+}
+}
+
+private void maybeFireHandleCommit(long baseOffset, int epoch, List 
records) {
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset == baseOffset) {
+listenerContext.fireHandleCommit(baseOffset, epoch, records);
+}
+}
+}
+
+private void maybeFireHandleClaim(LeaderState state) {
+for (ListenerContext listenerContext : listenerContexts) {
+int leaderEpoch = state.epoch();
+
+// We can fire `handleClaim` as soon as the listener has caught
+// up to the start of the leader epoch. This guarantees that the
+// state machine has seen the full committed state before it 
becomes
+// leader and begins writing to the log.

Review comment:
   I thought a little about it. Right now the state machine has just two 
states: 1) i am not a leader, and 2) i am a leader and have caught up with all 
committed data from previous epochs.  An alternative design is to fire 
`handleClaim` immediately and provide the starting offset of the leader epoch. 
Then the controller can wait until its state machine has caught up to that 
offset before starting to write data. In the end, I decided not to do it 
because it adds a third state and I did not expect the controller would be able 
to do anything useful in the additional state. The point about heartbeats is 
interesting, but even that seems tricky since the controller would not know if 
a broker had been fenced until it has caught up. I think the only thing the 
controller could do is hold the requests in purgatory, which might be better 
than letting them retry, but not sure it's worth it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510449508



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.mockito.Mockito;
+import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class RaftClientTestContext {
+private static final int FETCH_MAX_WAIT_MS = 0;
+
+static final TopicPartition METADATA_PARTITION = new 
TopicPartition("metadata", 0);
+static final int LOCAL_ID = 0;

Review comment:
   Okay. I made it possible to easily add support of this in the future 
without breaking the existing tests.
   
   We can make this changeable in the `Builder` as we need it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510448694



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.mockito.Mockito;
+import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class RaftClientTestContext {
+private static final int FETCH_MAX_WAIT_MS = 0;
+
+static final TopicPartition METADATA_PARTITION = new 
TopicPartition("metadata", 0);
+static final int LOCAL_ID = 0;
+
+static final int ELECTION_BACKOFF_MAX_MS = 100;
+static final int ELECTION_TIMEOUT_MS = 1;
+// fetch timeout is usually larger than election timeout
+static final int FETCH_TIMEOUT_MS = 5;
+static final int REQUEST_TIMEOUT_MS = 5000;
+static final int RETRY_BACKOFF_MS = 50;
+
+private final QuorumStateStore quorumStateStore;
+private final Random random;
+
+final KafkaRaftClient client;
+final Metrics metrics;
+final MockLog log;
+final MockNetworkChannel channel;
+final MockTime time;
+final Set voters;
+
+public static final class Builder {
+private final QuorumStateStore quorumStateStore = new 
MockQuorumStateStore();
+private final Random random = Mockito.spy(new Random(1));
+private final MockLog log = new MockLog(METADATA_PARTITION);
+private final Set voters;
+
+Builder(Set voters) {
+this.voters = voters;
+}
+
+Builder withElectedLeader(int epoch, int leaderId) throws IOException {
+
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(epoch, 
leaderId, voters));
+return this;
+}

[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510438318



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;

Review comment:
   Good catch. I think that I forgot to update this when moving from a 
single listener to multiple listeners.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510437477



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset) 
throws IOException {
 if (!candidateState.isVoteGranted())
 throw new IllegalStateException("Cannot become leader without 
majority votes granted");
 
+// Note that the leader does not retain the high watermark that was 
known
+// in the previous state. The purpose of this is to protect the 
monotonicity

Review comment:
   It was intended to refer to the behavior of not retaining the high 
watermark from the previous sentence. I will attempt to clarify.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510433916



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1757,35 +1809,86 @@ public void complete() {
 }
 }
 
-private static class UnwrittenAppend {
-private final Records records;
-private final long createTimeMs;
-private final long requestTimeoutMs;
-private final AckMode ackMode;
-private final CompletableFuture future;
+private final class ListenerContext implements 
CloseListener> {
+private final RaftClient.Listener listener;
+private BatchReader lastSent = null;
+private long lastAckedOffset = 0;
+private int claimedEpoch = 0;
+
+private ListenerContext(Listener listener) {
+this.listener = listener;
+}
+
+/**
+ * Get the last acked offset, which is one greater than the offset of 
the
+ * last record which was acked by the state machine.
+ */
+public synchronized long lastAckedOffset() {
+return lastAckedOffset;
+}
+
+/**
+ * Get the next expected offset, which might be larger than the last 
acked
+ * offset if there are inflight batches which have not been acked yet.
+ * Note that when fetching from disk, we may not know the last offset 
of
+ * inflight data until it has been processed by the state machine. In 
this case,
+ * we delay sending additional data until the state machine has read 
to the
+ * end and the last offset is determined.

Review comment:
   When catching up from the log, yes. However, I have implemented an 
optimization for writes from the leader. We save the original batch in memory 
so that it can be sent back to the state machine after the write is committed. 
In this case, we know the last offset of the batch, so we can have multiple 
inflight batches sent to the controller. This is nice because it means the 
elected controller will not have to read from disk.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510432303



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -16,57 +16,75 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.record.Records;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient {
+
+interface Listener {
+/**
+ * Callback which is invoked for all records committed to the log.
+ * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+ * after consuming the reader.
+ *
+ * Note that there is not a one-to-one correspondence between writes 
through
+ * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
+ * is free to batch together the records from multiple append calls 
provided
+ * that batch boundaries are respected. This means that each batch 
specified
+ * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
+ * a batch provided by the {@link BatchReader}.
+ *
+ * @param reader reader instance which must be iterated
+ */
+void handleCommit(BatchReader reader);
+
+/**
+ * Invoked after this node has become a leader. This is only called 
after
+ * all commits up to the start of the leader's epoch have been sent to
+ * {@link #handleCommit(BatchReader)}.
+ *
+ * After becoming a leader, the client is eligible to write to the log
+ * using {@link #scheduleAppend(int, List)}.
+ *
+ * @param epoch the claimed leader epoch
+ */
+default void handleClaim(int epoch) {}
+
+/**
+ * Invoked after a leader has stepped down. This callback may or may 
not
+ * fire before the next leader has been elected.
+ */
+default void handleResign() {}
+}
 
 /**
- * Initialize the client. This should only be called once and it must be
- * called before any of the other APIs can be invoked.
+ * Initialize the client. This should only be called once on startup.
  *
  * @throws IOException For any IO errors during initialization
  */
 void initialize() throws IOException;
 
 /**
- * Append a new entry to the log. The client must be in the leader state to
- * accept an append: it is up to the state machine implementation
- * to ensure this using {@link #currentLeaderAndEpoch()}.
- *
- * TODO: One improvement we can make here is to allow the caller to specify
- * the current leader epoch in the record set. That would ensure that each
- * leader change must be "observed" by the state machine before new appends
- * are accepted.
- *
- * @param records The records to append to the log
- * @param timeoutMs Maximum time to wait for the append to complete
- * @return A future containing the last offset and epoch of the appended 
records (if successful)
- */
-CompletableFuture append(Records records, AckMode ackMode, 
long timeoutMs);
-
-/**
- * Read a set of records from the log. Note that it is the responsibility 
of the state machine
- * to filter control records added by the Raft client itself.
- *
- * If the fetch offset is no longer valid, then the future will be 
completed exceptionally
- * with a {@link LogTruncationException}.
+ * Register a listener to get commit/leader notifications.
  *
- * @param position The position to fetch from
- * @param isolation The isolation level to apply to the read
- * @param maxWaitTimeMs The maximum time to wait for new data to become 
available before completion
- * @return The record set, which may be empty if fetching from the end of 
the log
+ * @param listener the listener
  */
-CompletableFuture read(OffsetAndEpoch position, Isolation 
isolation, long maxWaitTimeMs);
+void register(Listener listener);
 
 /**
- * Get the current leader (if known) and the current epoch.
+ * Append a list of records to the log. The write will be scheduled for 
some time
+ * in the future. There is no guarantee that appended records will be 
written to
+ * the log and eventually committed. However, it is guaranteed that if any 
of the
+ * records become committed, then all of them will be.
  *
- * @return Current leader and epoch information
+ * @param epoch the current leader epoch
+ * @param records the list of records to append
+ * @return the offset within the current epoch that the log entries will 
be appended,
+ * or null if the leader was unable to accept the write (e.g. due 
to memory
+ * being reached).
  */
-LeaderAndEpoch 

[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510426714



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -16,57 +16,75 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.record.Records;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient {
+
+interface Listener {
+/**
+ * Callback which is invoked for all records committed to the log.
+ * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+ * after consuming the reader.
+ *
+ * Note that there is not a one-to-one correspondence between writes 
through
+ * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
+ * is free to batch together the records from multiple append calls 
provided
+ * that batch boundaries are respected. This means that each batch 
specified
+ * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
+ * a batch provided by the {@link BatchReader}.
+ *
+ * @param reader reader instance which must be iterated
+ */
+void handleCommit(BatchReader reader);
+
+/**
+ * Invoked after this node has become a leader. This is only called 
after
+ * all commits up to the start of the leader's epoch have been sent to
+ * {@link #handleCommit(BatchReader)}.
+ *
+ * After becoming a leader, the client is eligible to write to the log
+ * using {@link #scheduleAppend(int, List)}.
+ *
+ * @param epoch the claimed leader epoch
+ */
+default void handleClaim(int epoch) {}
+
+/**
+ * Invoked after a leader has stepped down. This callback may or may 
not
+ * fire before the next leader has been elected.
+ */
+default void handleResign() {}
+}
 
 /**
- * Initialize the client. This should only be called once and it must be
- * called before any of the other APIs can be invoked.
+ * Initialize the client. This should only be called once on startup.
  *
  * @throws IOException For any IO errors during initialization
  */
 void initialize() throws IOException;
 
 /**
- * Append a new entry to the log. The client must be in the leader state to
- * accept an append: it is up to the state machine implementation
- * to ensure this using {@link #currentLeaderAndEpoch()}.
- *
- * TODO: One improvement we can make here is to allow the caller to specify
- * the current leader epoch in the record set. That would ensure that each
- * leader change must be "observed" by the state machine before new appends
- * are accepted.
- *
- * @param records The records to append to the log
- * @param timeoutMs Maximum time to wait for the append to complete
- * @return A future containing the last offset and epoch of the appended 
records (if successful)
- */
-CompletableFuture append(Records records, AckMode ackMode, 
long timeoutMs);
-
-/**
- * Read a set of records from the log. Note that it is the responsibility 
of the state machine
- * to filter control records added by the Raft client itself.
- *
- * If the fetch offset is no longer valid, then the future will be 
completed exceptionally
- * with a {@link LogTruncationException}.
+ * Register a listener to get commit/leader notifications.
  *
- * @param position The position to fetch from
- * @param isolation The isolation level to apply to the read
- * @param maxWaitTimeMs The maximum time to wait for new data to become 
available before completion
- * @return The record set, which may be empty if fetching from the end of 
the log
+ * @param listener the listener
  */
-CompletableFuture read(OffsetAndEpoch position, Isolation 
isolation, long maxWaitTimeMs);
+void register(Listener listener);
 
 /**
- * Get the current leader (if known) and the current epoch.
+ * Append a list of records to the log. The write will be scheduled for 
some time
+ * in the future. There is no guarantee that appended records will be 
written to
+ * the log and eventually committed. However, it is guaranteed that if any 
of the
+ * records become committed, then all of them will be.
  *
- * @return Current leader and epoch information
+ * @param epoch the current leader epoch
+ * @param records the list of records to append
+ * @return the offset within the current epoch that the log entries will 
be appended,
+ * or null if the leader was unable to accept the write (e.g. due 
to memory
+ * being reached).

Review comment:
   Agreed. I added this here: 

[GitHub] [kafka] ijuma commented on pull request #9469: MINOR: replace FetchRequest.TopicAndPartitionData by Map.Entry

2020-10-22 Thread GitBox


ijuma commented on pull request #9469:
URL: https://github.com/apache/kafka/pull/9469#issuecomment-714728901


   Thanks for the PR. Is this an improvement? It seems to make the code harder 
to read.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10634) LeaderChangeMessage should include the leader as one of the voters

2020-10-22 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-10634:
--

 Summary: LeaderChangeMessage should include the leader as one of 
the voters
 Key: KAFKA-10634
 URL: https://issues.apache.org/jira/browse/KAFKA-10634
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio


When a leader is elect the leader writes a `LeaderChangeMessage` to the 
replicated log. This message is defined in KIP-595 as:

```
{{{}}
{{  }}{{"type"}}{{: }}{{"data"}}{{,}}
{{  }}{{"name"}}{{: }}{{"LeaderChangeMessage"}}{{,}}
{{  }}{{"validVersions"}}{{: }}{{"0"}}{{,}}
{{  }}{{"flexibleVersions"}}{{: }}{{"0+"}}{{,}}
{{  }}{{"fields"}}{{: [}}
{{  }}{{{}}{{"name"}}{{: }}{{"LeaderId"}}{{, }}{{"type"}}{{: 
}}{{"int32"}}{{, }}{{"versions"}}{{: }}{{"0+"}}{{,}}
{{   }}{{"about"}}{{: }}{{"The ID of the newly elected leader"}}{{},}}
{{  }}{{{}}{{"name"}}{{: }}{{"VotedIds"}}{{, }}{{"type"}}{{: 
}}{{"[]int32"}}{{, }}{{"versions"}}{{: }}{{"0+"}}{{,}}
{{   }}{{"about"}}{{: }}{{"The IDs of the voters who voted for the current 
leader"}}{{},}}
 
{{  }}{{]}}
{{}}}
```

The current implementation doesn't include the LeaderId in the set of VoterIds. 
In the protocol it is guarantee that the leader must have voted for itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510421964



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -90,470 +76,480 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaRaftClientTest {
-private static final TopicPartition METADATA_PARTITION = new 
TopicPartition("metadata", 0);
-
-private final int localId = 0;
-private final int electionTimeoutMs = 1;
-private final int electionBackoffMaxMs = 100;
-private final int fetchTimeoutMs = 5;   // fetch timeout is usually 
larger than election timeout
-private final int retryBackoffMs = 50;
-private final int requestTimeoutMs = 5000;
-private final int fetchMaxWaitMs = 0;
-
-private final MockTime time = new MockTime();
-private final MockLog log = new MockLog(METADATA_PARTITION);
-private final MockNetworkChannel channel = new MockNetworkChannel();
-private final Random random = Mockito.spy(new Random(1));
-private final QuorumStateStore quorumStateStore = new 
MockQuorumStateStore();
-
-@AfterEach
-public void cleanUp() throws IOException {
-quorumStateStore.clear();
-}
-
-private InetSocketAddress mockAddress(int id) {
-return new InetSocketAddress("localhost", 9990 + id);
-}
-
-private KafkaRaftClient buildClient(Set voters) throws 
IOException {
-return buildClient(voters, new Metrics(time));
-}
-
-private KafkaRaftClient buildClient(Set voters, Metrics metrics) 
throws IOException {
-LogContext logContext = new LogContext();
-QuorumState quorum = new QuorumState(localId, voters, 
electionTimeoutMs, fetchTimeoutMs,
-quorumStateStore, time, logContext, random);
-
-Map voterAddresses = 
voters.stream().collect(Collectors.toMap(
-Function.identity(),
-this::mockAddress
-));
-
-KafkaRaftClient client = new KafkaRaftClient(channel, log, quorum, 
time, metrics,
-new MockFuturePurgatory<>(time), new MockFuturePurgatory<>(time), 
voterAddresses,
-electionBackoffMaxMs, retryBackoffMs, requestTimeoutMs, 
fetchMaxWaitMs, logContext, random);
-
-client.initialize();
-
-return client;
-}
-
 @Test
 public void testInitializeSingleMemberQuorum() throws IOException {
-buildClient(Collections.singleton(localId));
-assertEquals(ElectionState.withElectedLeader(1, localId, 
Collections.singleton(localId)),
-quorumStateStore.readElectionState());
+RaftClientTestContext context = 
RaftClientTestContext.build(Collections.singleton(LOCAL_ID));
+assertEquals(
+ElectionState.withElectedLeader(1, LOCAL_ID, 
Collections.singleton(LOCAL_ID)),
+context.quorumStateStore.readElectionState()
+);
 }
 
 @Test
 public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum() 
throws Exception {
 // Start off as leader. We should still bump the epoch after 
initialization
 
 int initialEpoch = 2;
-Set voters = Collections.singleton(localId);
-
quorumStateStore.writeElectionState(ElectionState.withElectedLeader(initialEpoch,
 localId, voters));
-
-KafkaRaftClient client = buildClient(voters);
-assertEquals(1L, log.endOffset().offset);
-assertEquals(initialEpoch + 1, log.lastFetchedEpoch());
-assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch 
+ 1),
-client.currentLeaderAndEpoch());
-assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, 
localId, voters),
-quorumStateStore.readElectionState());
+Set voters = Collections.singleton(LOCAL_ID);
+RaftClientTestContext context = new RaftClientTestContext.Builder()
+.updateQuorumStateStore(quorumStateStore -> {
+assertDoesNotThrow(() -> {
+quorumStateStore.writeElectionState(
+ElectionState.withElectedLeader(initialEpoch, 
LOCAL_ID, voters)
+);
+});
+})
+.build(voters);
+
+assertEquals(1L, context.log.endOffset().offset);
+assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
+assertEquals(new LeaderAndEpoch(OptionalInt.of(LOCAL_ID), initialEpoch 
+ 1),
+context.client.currentLeaderAndEpoch());
+assertEquals(ElectionState.withElectedLeader(initialEpoch + 1, 
LOCAL_ID, voters),
+context.quorumStateStore.readElectionState());
 }
 
 @Test
 public void testInitializeAsLeaderFromStateStore() throws Exception {
-Set voters = Utils.mkSet(localId, 1);
+Set voters = Utils.mkSet(LOCAL_ID, 1);
 int epoch = 2;
 
-Mockito.doReturn(0).when(random).nextInt(electionTimeoutMs);
-

[jira] [Commented] (KAFKA-10633) Constant probing rebalances in Streams 2.6

2020-10-22 Thread Bradley Peterson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219307#comment-17219307
 ] 

Bradley Peterson commented on KAFKA-10633:
--

/cc [~vvcephei]

> Constant probing rebalances in Streams 2.6
> --
>
> Key: KAFKA-10633
> URL: https://issues.apache.org/jira/browse/KAFKA-10633
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Bradley Peterson
>Priority: Major
> Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 
> 46.409Z.csv
>
>
> We are seeing a few issues with the new rebalancing behavior in Streams 2.6. 
> This ticket is for constant probing rebalances on one StreamThread, but I'll 
> mention the other issues, as they may be related.
> First, when we redeploy the application we see tasks being moved, even though 
> the task assignment was stable before redeploying. We would expect to see 
> tasks assigned back to the same instances and no movement. The application is 
> in EC2, with persistent EBS volumes, and we use static group membership to 
> avoid rebalancing. To redeploy the app we terminate all EC2 instances. The 
> new instances will reattach the EBS volumes and use the same group member id.
> After redeploying, we sometimes see the group leader go into a tight probing 
> rebalance loop. This doesn't happen immediately, it could be several hours 
> later. Because the redeploy caused task movement, we see expected probing 
> rebalances every 10 minutes. But, then one thread will go into a tight loop 
> logging messages like "Triggering the followup rebalance scheduled for 
> 1603323868771 ms.", handling the partition assignment (which doesn't change), 
> then "Requested to schedule probing rebalance for 1603323868771 ms." This 
> repeats several times a second until the app is restarted again. I'll attach 
> a log export from one such incident.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


jsancio commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510385486



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work

Review comment:
   I would remove this comment. We ca file an issue and fix it if this 
becomes a performance issue.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;

Review comment:
   Why a `return`? Did you mean to use `continue`? If this is suppose to be 
a `continue` then maybe we can `Optional.ifPresent`.
   
   Same comment for one of the other overloaded `maybeFireHandleCommit`.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -975,12 +1029,9 @@ private boolean handleFetchResponse(
 
log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
 logger.info("Truncated to offset {} from Fetch response 
from leader {}",
 

[GitHub] [kafka] jherico commented on a change in pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed

2020-10-22 Thread GitBox


jherico commented on a change in pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#discussion_r510410362



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -483,6 +483,7 @@ public void logUnused() {
 resolvedOriginals.putAll(result.data());
 }
 }
+providers.values().forEach(x -> Utils.closeQuietly(x, "config 
provider"));

Review comment:
   if an exception occurs between line 477 and line 486, then the close 
function won't be called.  The solution would be to either create a type for 
`providers` that extends `Map` and implements 
Autoclosable, or to simply put an explicit `try`/`finally` block here to ensure 
that the close function is called in every case.  That also implies that 
`instantiateConfigProviders` should be modified so that if an exception is 
thrown from inside it, any previously opened `ConfigProvider` instances are 
closed. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit

2020-10-22 Thread GitBox


vvcephei commented on pull request #9479:
URL: https://github.com/apache/kafka/pull/9479#issuecomment-714715589


   Cherry-picked to 2.7 (cc @bbejeck )



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit

2020-10-22 Thread GitBox


vvcephei merged pull request #9479:
URL: https://github.com/apache/kafka/pull/9479


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10633) Constant probing rebalances in Streams 2.6

2020-10-22 Thread Bradley Peterson (Jira)
Bradley Peterson created KAFKA-10633:


 Summary: Constant probing rebalances in Streams 2.6
 Key: KAFKA-10633
 URL: https://issues.apache.org/jira/browse/KAFKA-10633
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Bradley Peterson
 Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 
46.409Z.csv

We are seeing a few issues with the new rebalancing behavior in Streams 2.6. 
This ticket is for constant probing rebalances on one StreamThread, but I'll 
mention the other issues, as they may be related.

First, when we redeploy the application we see tasks being moved, even though 
the task assignment was stable before redeploying. We would expect to see tasks 
assigned back to the same instances and no movement. The application is in EC2, 
with persistent EBS volumes, and we use static group membership to avoid 
rebalancing. To redeploy the app we terminate all EC2 instances. The new 
instances will reattach the EBS volumes and use the same group member id.

After redeploying, we sometimes see the group leader go into a tight probing 
rebalance loop. This doesn't happen immediately, it could be several hours 
later. Because the redeploy caused task movement, we see expected probing 
rebalances every 10 minutes. But, then one thread will go into a tight loop 
logging messages like "Triggering the followup rebalance scheduled for 
1603323868771 ms.", handling the partition assignment (which doesn't change), 
then "Requested to schedule probing rebalance for 1603323868771 ms." This 
repeats several times a second until the app is restarted again. I'll attach a 
log export from one such incident.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9479: KAFKA-10631: Handle ProducerFencedException on offset commit

2020-10-22 Thread GitBox


vvcephei commented on pull request #9479:
URL: https://github.com/apache/kafka/pull/9479#issuecomment-714708637


   LGTM! Thanks, @cadonna .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510387310



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.mockito.Mockito;
+import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class RaftClientTestContext {
+private static final int FETCH_MAX_WAIT_MS = 0;
+
+static final TopicPartition METADATA_PARTITION = new 
TopicPartition("metadata", 0);
+static final int LOCAL_ID = 0;

Review comment:
   I'm somewhat inclined to add the local id to the builder rather than 
making it constant. It makes the builder a bit more self-contained. 
   
   On a similar note, it would be nice to push the other static config values 
into the builder as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510387310



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
+import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.EndQuorumEpochRequestData;
+import org.apache.kafka.common.message.EndQuorumEpochResponseData;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.VoteRequestData;
+import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.mockito.Mockito;
+import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final class RaftClientTestContext {
+private static final int FETCH_MAX_WAIT_MS = 0;
+
+static final TopicPartition METADATA_PARTITION = new 
TopicPartition("metadata", 0);
+static final int LOCAL_ID = 0;

Review comment:
   I'm somewhat inclined to add the local id to the builder rather than 
making it constant. It makes the builder a bit more self-contained. 
   
   On a similar note, it would be nice to push these state config values into 
the builder as well.

##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the 

[jira] [Commented] (KAFKA-10592) system tests not running after python3 merge

2020-10-22 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219284#comment-17219284
 ] 

Nikolay Izhikov commented on KAFKA-10592:
-

Hello [~rndgstn], [~omkreddy]

I fixed vagrant instructions and setup.
Tested it locally and its works.
Please, take a look at my changes

https://github.com/apache/kafka/pull/9480

> system tests not running after python3 merge
> 
>
> Key: KAFKA-10592
> URL: https://issues.apache.org/jira/browse/KAFKA-10592
> Project: Kafka
>  Issue Type: Task
>  Components: system tests
>Reporter: Ron Dagostino
>Assignee: Nikolay Izhikov
>Priority: Major
>
> We are seeing these errors on system tests due to the python3 merge: 
> {noformat}
> [ERROR:2020-10-08 21:03:51,341]: Failed to import 
> kafkatest.sanity_checks.test_performance_services, which may indicate a 
> broken test that cannot be loaded: ImportError: No module named server
>  [ERROR:2020-10-08 21:03:51,351]: Failed to import 
> kafkatest.benchmarks.core.benchmark_test, which may indicate a broken test 
> that cannot be loaded: ImportError: No module named server
>  [ERROR:2020-10-08 21:03:51,501]: Failed to import 
> kafkatest.tests.core.throttling_test, which may indicate a broken test that 
> cannot be loaded: ImportError: No module named server
>  [ERROR:2020-10-08 21:03:51,598]: Failed to import 
> kafkatest.tests.client.quota_test, which may indicate a broken test that 
> cannot be loaded: ImportError: No module named server
>  {noformat}
> I ran one of the system tests at the commit prior to the python3 merge 
> ([https://github.com/apache/kafka/commit/40a23cc0c2e1efa8632f59b093672221a3c03c36])
>  and it ran fine:
> [http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-10-09--001.1602255415--rondagostino--rtd_just_before_python3_merge--40a23cc0c/report.html]
> I ran the exact same test file at the next commit – the python3 commit at 
> [https://github.com/apache/kafka/commit/4e65030e055104a7526e85b563a11890c61d6ddf]
>  – and it failed with the import error. The test results show no report.html 
> file because nothing ran: 
> [http://testing.confluent.io/confluent-kafka-system-test-results/?prefix=2020-10-09--001.1602251990--apache--trunk--7947c18b5/]
> Not sure when this began because I do see these tests running successfully 
> during the development process as documented in 
> https://issues.apache.org/jira/browse/KAFKA-10402 (`tests run: 684` as 
> recently as 9/20 in that ticket). But the PR build (rebased onto latest 
> trunk) showed the above import errors and only 606 tests run. I assume those 
> 4 files mentioned include 78 tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-10-22 Thread GitBox


mjsax commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-714688801


   The build did run, but failed with a compile error: Maybe something wrong 
with the rebase you did?
   ```
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9000@2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java:209:
 error: cannot find symbol
 new KTableTransformValues<>(parent, new 
SingletonNoOpValueTransformer<>(), null).enableSendingOldValues(true);
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6

2020-10-22 Thread GitBox


jolshan commented on pull request #9471:
URL: https://github.com/apache/kafka/pull/9471#issuecomment-714684198


   @vvcephei Yeah. I was worried that might be happening. Good to check
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji opened a new pull request #9482:
URL: https://github.com/apache/kafka/pull/9482


   In #9418, we add a listener to the `RaftClient` interface. In that patch, we 
used it only to send commit notifications for writes from the leader. In this 
PR, we extend the `handleCommit` API to accept all committed data and we remove 
the pull-based `read` API. Additionally, we add two new callbacks to the 
listener interface in order to notify the state machine when the raft client 
has claimed or resigned leadership.
   
   Note this patch builds on top of #9418. Once merged, I will rebase this 
patch and remove draft status.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9999) Internal topic creation failure should be non-fatal and trigger explicit rebalance

2020-10-22 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-.

Resolution: Won't Fix

> Internal topic creation failure should be non-fatal and trigger explicit 
> rebalance 
> ---
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, streams
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We spotted a case in system test failure where the topic already exists but 
> the admin client still attempts to recreate it:
>  
> {code:java}
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
> SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog. Topic is probably 
> marked for deletion (number of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete 
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog' already exists. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
> SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number 
> of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete 
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'SmokeTest-uwin-cnt-changelog' already exists. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager) 
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
> SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number 
> of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete 
> operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'SmokeTest-cntByCnt-changelog' already exists. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics 
> [SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog, 
> SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made 
> ready with 5 retries left 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics 
> after 5 retries. This can happen if the Kafka cluster is temporary not 
> available. You can increase admin client config `retries` to be resilient 
> against this error. 
> (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,221] ERROR stream-thread 
> [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered 
> the following unexpected Kafka exception during processing, this usually 
> indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.StreamsException: Could not create topics 
> after 5 retries. This can happen if the Kafka cluster is temporary not 
> available. You can increase admin client config `retries` to be resilient 
> against this error.
>         at 
> org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
>  
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>         at 
> 

[jira] [Created] (KAFKA-10632) Raft client should push all committed data to listeners

2020-10-22 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10632:
---

 Summary: Raft client should push all committed data to listeners
 Key: KAFKA-10632
 URL: https://issues.apache.org/jira/browse/KAFKA-10632
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We would like to move to a push model for sending committed data to the state 
machine. This simplifies the state machine a bit since it does not need to 
track its own position and poll for new data. It also allows the raft layer to 
ensure that all committed data up to the state of a leader epoch has been sent 
before allowing the state machine to begin sending writes. Finally, it allows 
us to take advantage of optimizations. For example, we can save the need to 
re-read writes that have been sent to the leader; instead, we can retain the 
data in memory and push it to the state machine after it becomes committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6

2020-10-22 Thread GitBox


vvcephei commented on pull request #9471:
URL: https://github.com/apache/kafka/pull/9471#issuecomment-714674113


   On second thought, it looks like there's legitimately a test that hangs on 
2.6 in java 11.
   
   I was looking before at the whole log, which I didn't realize shows all the 
builds and tests in one file.
   
   Here are the logs for just the java 11 build on both runs:
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9471/1/execution/node/85/log/
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9471/2/execution/node/86/log/
   
   It clearly gets stuck about 35 minutes in. I'm attempting a local repro.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9481: KAFKA-10284: Disable static membership test in 2.4

2020-10-22 Thread GitBox


vvcephei commented on pull request #9481:
URL: https://github.com/apache/kafka/pull/9481#issuecomment-714670340


   Hey @abbccdda , @feyman2016 , what do you think about this, as opposed to 
spending more time doing a backport for 2.4?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9481: KAFKA-10284: Disable static membership test in 2.4

2020-10-22 Thread GitBox


vvcephei opened a new pull request #9481:
URL: https://github.com/apache/kafka/pull/9481


   This test was fixed in https://github.com/apache/kafka/pull/9270
   for 2.5+, but the code in 2.4 is too different to have a clean backport.
   Rather than risk introducing a worse bug in 2.4, and also because the
   probability of a new bugfix release for 2.4 seems low, I'm proposing
   just to disable this test in the 2.4 branch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-22 Thread GitBox


vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714669017


   Cherry-picked to 2.5



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10631) ProducerFencedException is not Handled on Offest Commit

2020-10-22 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-10631:
---

Assignee: Bruno Cadonna

> ProducerFencedException is not Handled on Offest Commit
> ---
>
> Key: KAFKA-10631
> URL: https://issues.apache.org/jira/browse/KAFKA-10631
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.7.0
>
>
> The transaction manager does currently not handle producer fenced errors 
> returned from a offset commit request.
> We found this bug because we encountered the following exception in our soak 
> cluster:
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Error encountered trying to 
> commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task 
> [0_0]]
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
> [2020-10-22T04:09:54+02:00] 
> (streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: 
> org.apache.kafka.common.KafkaException: Unexpected error in 
> TxnOffsetCommitResponse: There is a newer producer with the same 
> transactionalId which fences the current one.
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10631) ProducerFencedException is not Handled on Offest Commit

2020-10-22 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10631:

Fix Version/s: 2.7.0

> ProducerFencedException is not Handled on Offest Commit
> ---
>
> Key: KAFKA-10631
> URL: https://issues.apache.org/jira/browse/KAFKA-10631
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.7.0
>
>
> The transaction manager does currently not handle producer fenced errors 
> returned from a offset commit request.
> We found this bug because we encountered the following exception in our soak 
> cluster:
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Error encountered trying to 
> commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task 
> [0_0]]
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
> [2020-10-22T04:09:54+02:00] 
> (streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: 
> org.apache.kafka.common.KafkaException: Unexpected error in 
> TxnOffsetCommitResponse: There is a newer producer with the same 
> transactionalId which fences the current one.
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9929) Support reverse iterator on WindowStore

2020-10-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9929:
---
Fix Version/s: 2.7.0

> Support reverse iterator on WindowStore
> ---
>
> Key: KAFKA-9929
> URL: https://issues.apache.org/jira/browse/KAFKA-9929
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.7.0
>
>
> Currently, WindowStore fetch operations return an iterator sorted from 
> earliest to latest result:
> ```
> * For each key, the iterator guarantees ordering of windows, starting from 
> the oldest/earliest
> * available window to the newest/latest window.
> ```
>  
> We have a use-case where traces are stored in a WindowStore 
> and use Kafka Streams to create a materialized view of traces. A query 
> request comes with a time range (e.g. now-1h, now) and want to return the 
> most recent results, i.e. fetch from this period of time, iterate and pattern 
> match latest/most recent traces, and if enough results, then reply without 
> moving further on the iterator.
> Same store is used to search for previous traces. In this case, it search a 
> key for the last day, if found traces, we would also like to iterate from the 
> most recent.
> RocksDb seems to support iterating backward and forward: 
> [https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound]
>  
> For reference: This in some way extracts some bits from this previous issue: 
> https://issues.apache.org/jira/browse/KAFKA-4212:
>  
> > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via 
> > segment dropping, but it stores multiple items per key, based on their 
> > timestamp. But this store can be repurposed as a cache by fetching the 
> > items in reverse chronological order and returning the first item found.
>  
> Would like to know if there is any impediment on RocksDb or  WindowStore to 
> support this.
> Adding an argument to reverse in current fetch methods would be great:
> ```
> WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD)
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9476: MINOR: Refactor RaftClientTest to be used by other tests

2020-10-22 Thread GitBox


jsancio commented on a change in pull request #9476:
URL: https://github.com/apache/kafka/pull/9476#discussion_r510331249



##
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##
@@ -1536,67 +1522,70 @@ public void 
testObserverLeaderRediscoveryAfterRequestTimeout() throws Exception
 int otherNodeId = 2;
 int epoch = 5;
 Set voters = Utils.mkSet(leaderId, otherNodeId);
-KafkaRaftClient client = buildClient(voters);
-discoverLeaderAsObserver(client, voters, leaderId, epoch);
 
-pollUntilSend(client);
-RaftRequest.Outbound fetchRequest1 = assertSentFetchRequest();
+RaftClientTestContext context = RaftClientTestContext.build(voters);
+
+context.discoverLeaderAsObserver(voters, leaderId, epoch);
+
+context.pollUntilSend();
+RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
 assertEquals(leaderId, fetchRequest1.destinationId());
-assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+RaftClientTestContext.assertFetchRequestData(fetchRequest1, epoch, 0L, 
0);
 
-time.sleep(requestTimeoutMs);
-pollUntilSend(client);
+context.time.sleep(REQUEST_TIMEOUT_MS);
+context.pollUntilSend();
 
 // We should retry the Fetch against the other voter since the original
 // voter connection will be backing off.
-RaftRequest.Outbound fetchRequest2 = assertSentFetchRequest();
+RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
 assertNotEquals(leaderId, fetchRequest2.destinationId());
 assertTrue(voters.contains(fetchRequest2.destinationId()));
-assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+RaftClientTestContext.assertFetchRequestData(fetchRequest2, epoch, 0L, 
0);
 
-deliverResponse(fetchRequest2.correlationId, 
fetchRequest2.destinationId(),
+context.deliverResponse(fetchRequest2.correlationId, 
fetchRequest2.destinationId(),
 fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
Errors.FENCED_LEADER_EPOCH));
-client.poll();
+context.client.poll();
 
-assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
quorumStateStore.readElectionState());
+assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
context.quorumStateStore.readElectionState());
 }
 
 @Test
 public void testLeaderGracefulShutdown() throws Exception {
 int otherNodeId = 1;
-Set voters = Utils.mkSet(localId, otherNodeId);
 int epoch = 1;
-KafkaRaftClient client = initializeAsLeader(voters, epoch);
+Set voters = Utils.mkSet(LOCAL_ID, otherNodeId);
+
+RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(voters, epoch);
 
 // Now shutdown
 int shutdownTimeoutMs = 5000;
-CompletableFuture shutdownFuture = 
client.shutdown(shutdownTimeoutMs);
+CompletableFuture shutdownFuture = 
context.client.shutdown(shutdownTimeoutMs);
 
 // We should still be running until we have had a chance to send 
EndQuorumEpoch
-assertTrue(client.isShuttingDown());
-assertTrue(client.isRunning());
+assertTrue(context.client.isShuttingDown());

Review comment:
   I thought about this last night and hack some solutions. I wasn't very 
pleased with the result. Let's explore this improvement in a future PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thake commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-22 Thread GitBox


thake commented on pull request #9467:
URL: https://github.com/apache/kafka/pull/9467#issuecomment-714640590


   Ready for review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10631) ProducerFencedException is not Handled on Offest Commit

2020-10-22 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10631:
--
Description: 
The transaction manager does currently not handle producer fenced errors 
returned from a offset commit request.

We found this bug because we encountered the following exception in our soak 
cluster:
{code:java}
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to 
commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task 
[0_0]]
at 
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[2020-10-22T04:09:54+02:00] 
(streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: 
org.apache.kafka.common.KafkaException: Unexpected error in 
TxnOffsetCommitResponse: There is a newer producer with the same 
transactionalId which fences the current one.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
at java.lang.Thread.run(Thread.java:748)
{code}

  was:
The transaction manager does currently not handle producer fenced errors 
returned from a offset commit request.

We found this bug because we saw the following exception in our soak cluster:

{code:java}
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to 
commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task 
[0_0]]
at 
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[2020-10-22T04:09:54+02:00] 
(streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: 
org.apache.kafka.common.KafkaException: Unexpected error in 
TxnOffsetCommitResponse: There is a newer producer with the same 
transactionalId which fences the current one.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
at java.lang.Thread.run(Thread.java:748)
{code}


> ProducerFencedException is not Handled on Offest Commit
> ---
>
> Key: KAFKA-10631
> URL: 

[jira] [Created] (KAFKA-10631) ProducerFencedException is not Handled on Offest Commit

2020-10-22 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10631:
-

 Summary: ProducerFencedException is not Handled on Offest Commit
 Key: KAFKA-10631
 URL: https://issues.apache.org/jira/browse/KAFKA-10631
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.7.0
Reporter: Bruno Cadonna


The transaction manager does currently not handle producer fenced errors 
returned from a offset commit request.

We found this bug because we saw the following exception in our soak cluster:

{code:java}
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to 
commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task 
[0_0]]
at 
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[2020-10-22T04:09:54+02:00] 
(streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: 
org.apache.kafka.common.KafkaException: Unexpected error in 
TxnOffsetCommitResponse: There is a newer producer with the same 
transactionalId which fences the current one.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-10-22 Thread GitBox


soarez commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-714636553


   @mjsax rebased and fixed an error. Can we try running the tests again?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thake commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-22 Thread GitBox


thake commented on pull request #9467:
URL: https://github.com/apache/kafka/pull/9467#issuecomment-714632784


   @vvcephei Sorry for the confusion, creating a new commit now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thake commented on pull request #9478: Fixed unit test mocks and incorrect required context serdes.

2020-10-22 Thread GitBox


thake commented on pull request #9478:
URL: https://github.com/apache/kafka/pull/9478#issuecomment-714630476


   You were right. Somehow I thought that the tests were right and that 
keySerde and valueSerde are nullable. It would really help to have a null safe 
API description using annotations.
   
   Closing this PR because it addresses a non issue :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thake closed pull request #9478: Fixed unit test mocks and incorrect required context serdes.

2020-10-22 Thread GitBox


thake closed pull request #9478:
URL: https://github.com/apache/kafka/pull/9478


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apurvam commented on pull request #9479: Handle ProducerFencedException on offset commit

2020-10-22 Thread GitBox


apurvam commented on pull request #9479:
URL: https://github.com/apache/kafka/pull/9479#issuecomment-714628995


   I can't believe this bug has been lurking for so long. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9472: MINOR: Add Jenkinsfile to 2.3

2020-10-22 Thread GitBox


vvcephei commented on pull request #9472:
URL: https://github.com/apache/kafka/pull/9472#issuecomment-714624598


   Thanks for the reviews!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9472: MINOR: Add Jenkinsfile to 2.3

2020-10-22 Thread GitBox


vvcephei merged pull request #9472:
URL: https://github.com/apache/kafka/pull/9472


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9474: MINOR: Add Jenkinsfile to 2.2

2020-10-22 Thread GitBox


vvcephei merged pull request #9474:
URL: https://github.com/apache/kafka/pull/9474


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9475: MINOR: Add Jenkinsfile to 2.1

2020-10-22 Thread GitBox


vvcephei commented on pull request #9475:
URL: https://github.com/apache/kafka/pull/9475#issuecomment-714623937


   Thanks for the reviews, all!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9475: MINOR: Add Jenkinsfile to 2.1

2020-10-22 Thread GitBox


vvcephei merged pull request #9475:
URL: https://github.com/apache/kafka/pull/9475


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9474: MINOR: Add Jenkinsfile to 2.2

2020-10-22 Thread GitBox


vvcephei commented on pull request #9474:
URL: https://github.com/apache/kafka/pull/9474#issuecomment-714624277


   Thanks for the reviews!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov opened a new pull request #9480: [WIP] KAFKA-10592: Fix vagrant for a system tests with python3

2020-10-22 Thread GitBox


nizhikov opened a new pull request #9480:
URL: https://github.com/apache/kafka/pull/9480


   Fix vagrant for a system tests with a python3.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] joshuagrisham commented on pull request #9470: Add recursive support to Connect Cast and ReplaceField transforms, and support for casting complex types to either a native or JSON stri

2020-10-22 Thread GitBox


joshuagrisham commented on pull request #9470:
URL: https://github.com/apache/kafka/pull/9470#issuecomment-714608520


   I saw that all of the checks have failed, but when I look in the log they 
are all related only to `checkstyle`.. if I get some time tomorrow I will try 
to address all of the code style / standards that failed the check and push a 
new commit just to remove any potential issue from that one.
   Stay tuned!  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10406) Table data doesn't wrap around in fixed-width columns and gets hidden

2020-10-22 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219137#comment-17219137
 ] 

Mickael Maison commented on KAFKA-10406:


[~vvcephei] I see you merged https://github.com/apache/kafka-site/pull/295. 
Does that PR completely resolve this JIRA or is there some work left?

> Table data doesn't wrap around in fixed-width columns and gets hidden
> -
>
> Key: KAFKA-10406
> URL: https://issues.apache.org/jira/browse/KAFKA-10406
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Sanjay Ravikumar
>Priority: Trivial
>  Labels: documentation
> Fix For: 2.6.0, 2.5.1
>
>
> The Kafka config-steams docs includes a table for "[Optional Configuration 
> Parameters|[https://kafka.apache.org/26/documentation/streams/developer-guide/config-streams#optional-configuration-parameters]];.
>  The first ("Parameter Name") and last ("Default Value") columns text doesn't 
> wrap around when the text length exceeds the width of the columns. Because of 
> this, some of the text gets hidden from the view.
> This is the case with both 2.6.0 as well as 2.5.1 versions and may be others 
> as well. This needs to be handled similar to how the text in third 
> ("Description") column is handled where the text wraps around and all the 
> data is visible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna opened a new pull request #9479: Handle ProducerFencedException on offset commit

2020-10-22 Thread GitBox


cadonna opened a new pull request #9479:
URL: https://github.com/apache/kafka/pull/9479


   The transaction manager does currently not handle producer fenced errors 
returned from a offset commit request.
   
   This PR adds the handling of the producer fenced errors.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10554) Perform follower truncation based on epoch offsets returned in Fetch response

2020-10-22 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219111#comment-17219111
 ] 

Bill Bejeck commented on KAFKA-10554:
-

Since this is not a blocker, and we've hit code freeze, I'm going to move the 
fix version of this ticket to 2.8 as part of the 2.7.0 release process.  Should 
this be incorrect, please discuss on the [DISCUSS] Apache Kafka 2.7.0 release 
email thread.

 

> Perform follower truncation based on epoch offsets returned in Fetch response
> -
>
> Key: KAFKA-10554
> URL: https://issues.apache.org/jira/browse/KAFKA-10554
> Project: Kafka
>  Issue Type: Task
>  Components: replication
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> KAFKA-10435 updated fetch protocol for KIP-595 to return diverging epoch and 
> offset as part of fetch response. We can use this to truncate logs in 
> followers while processing fetch responses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10554) Perform follower truncation based on epoch offsets returned in Fetch response

2020-10-22 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10554:

Fix Version/s: (was: 2.7.0)
   2.8.0

> Perform follower truncation based on epoch offsets returned in Fetch response
> -
>
> Key: KAFKA-10554
> URL: https://issues.apache.org/jira/browse/KAFKA-10554
> Project: Kafka
>  Issue Type: Task
>  Components: replication
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.8.0
>
>
> KAFKA-10435 updated fetch protocol for KIP-595 to return diverging epoch and 
> offset as part of fetch response. We can use this to truncate logs in 
> followers while processing fetch responses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7575) 'Error while writing to checkpoint file' Issue

2020-10-22 Thread Ivan Gonzalez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219108#comment-17219108
 ] 

Ivan Gonzalez commented on KAFKA-7575:
--

Hi, We are running a cluster of 3 Kafka servers. (Kafka version 2.3) today one 
of the nodes went down because this issue. Good news is that is the first time 
it happens in 2 years, however we have no way to prevent it. 

OS: Red Hat Enterprise Linux Server Release 7.5

> 'Error while writing to checkpoint file' Issue
> --
>
> Key: KAFKA-7575
> URL: https://issues.apache.org/jira/browse/KAFKA-7575
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.1
> Environment: Windows 10, Kafka 1.1.1
>Reporter: Dasun Nirmitha
>Priority: Major
> Attachments: Dry run error.rar
>
>
> I'm currently testing a Java Kafka producer application coded to retrieve a 
> db value from a local mysql db and produce to a single topic. Locally I've 
> got a Zookeeper server and a Kafka single broker running.
> My issue is I need to produce this from the Kafka producer each second, and 
> that works for around 2 hours until broker throws an 'Error while writing to 
> checkpoint file' and shuts down. Producing with a 1 minute interval works 
> with no issues but unfortunately I need the produce interval to be 1 second.
> I have attached a rar containing screenshots of the Errors thrown from the 
> Broker and my application.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-10-22 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10284:
-
Fix Version/s: 2.6.1

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Critical
>  Labels: help-wanted
> Fix For: 2.7.0, 2.6.1
>
> Attachments: How to reproduce the issue in KAFKA-10284.md
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-22 Thread GitBox


vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714574329


   Cherry-picked to 2.6 (cc @mimaison )



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10201) Update codebase to use more inclusive terms

2020-10-22 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-10201:

Fix Version/s: (was: 2.7.0)
   2.8.0

> Update codebase to use more inclusive terms
> ---
>
> Key: KAFKA-10201
> URL: https://issues.apache.org/jira/browse/KAFKA-10201
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xavier Léauté
>Priority: Major
> Fix For: 2.8.0
>
>
> see the corresponding KIP 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-629:+Use+racially+neutral+terms+in+our+codebase



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10201) Update codebase to use more inclusive terms

2020-10-22 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219093#comment-17219093
 ] 

Bill Bejeck commented on KAFKA-10201:
-

[~xvrl] , Since we've hit code freeze on 10/21, I'm going to set the fix 
version to 2.8 as part of the 2.7.0 release process.

> Update codebase to use more inclusive terms
> ---
>
> Key: KAFKA-10201
> URL: https://issues.apache.org/jira/browse/KAFKA-10201
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xavier Léauté
>Priority: Major
> Fix For: 2.7.0
>
>
> see the corresponding KIP 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-629:+Use+racially+neutral+terms+in+our+codebase



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6

2020-10-22 Thread GitBox


vvcephei commented on pull request #9471:
URL: https://github.com/apache/kafka/pull/9471#issuecomment-714569139


   The tests for 8 and 14 passed, but 11 timed out. Weirdly, the build log 
(https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9471/1/console) 
actually shows that the tests passed after just under 2 hours, and then it 
looks like it got hung up "recording the results".
   
   Re-running now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-10-22 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10284.
--
Resolution: Fixed

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Critical
>  Labels: help-wanted
> Fix For: 2.7.0
>
> Attachments: How to reproduce the issue in KAFKA-10284.md
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9478: Fixed unit test mocks and incorrect required context serdes.

2020-10-22 Thread GitBox


vvcephei commented on pull request #9478:
URL: https://github.com/apache/kafka/pull/9478#issuecomment-714565433


   Sorry, this is getting complicated. I've just realized that this PR is 
specifically addressing some internal test failures that came up in the 2.6 
port: https://github.com/apache/kafka/pull/9467 .
   
   If that's the sole motivation, then I think we can close this PR and instead 
just fix the tests in https://github.com/apache/kafka/pull/9467
   
   What do you think, @thake ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-22 Thread GitBox


vvcephei commented on pull request #9467:
URL: https://github.com/apache/kafka/pull/9467#issuecomment-714564666


   Ahh, I already commented on https://github.com/apache/kafka/pull/9478, but I 
think now I see the motivation for that PR.
   
   If it was indeed just because there are internal tests that need new 
expectations now, we should just add the expectations instead of twisting the 
"main" code into handling an unexpected condition. By the way, I'm sure it's 
not obvious, but the fundamental reason for my taking this position is that in 
production code, the context serdes can never be null. There is a "default 
default" of ByteArraySerde even if you don't configure a "deafult" serde at all.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9478: Fixed unit test mocks and incorrect required context serdes.

2020-10-22 Thread GitBox


vvcephei commented on pull request #9478:
URL: https://github.com/apache/kafka/pull/9478#issuecomment-714562224


   Thanks for this, @thake!
   
   I'm wondering if this is the right direction to go here. In a real Streams 
application, as well as in MockProcessorContext and TopologyTestDriver, the 
context serdes can never be null, so it seems to be a problem only for EasyMock 
tests.
   
   It's not that "easy" to begin with to set up an EasyMock of a context for 
testing Streams components because the set of required interactions is both 
non-trivial and an internal detail. That's why we offer the 
MockProcessorContext, which provides default behavior that should be suitable 
for most tests and still lets you capture and assert the things you'd need to 
assert.
   
   Have I misunderstood the problem here?
   
   Thanks,
   -John



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-22 Thread GitBox


vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714559804







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-10-22 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10284:
-
Fix Version/s: 2.7.0

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Critical
>  Labels: help-wanted
> Fix For: 2.7.0
>
> Attachments: How to reproduce the issue in KAFKA-10284.md
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-22 Thread GitBox


vvcephei merged pull request #9270:
URL: https://github.com/apache/kafka/pull/9270


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-10-22 Thread GitBox


vvcephei commented on pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#issuecomment-714551566


   The test failure was unrelated: Build / JDK 15 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9477: MINOR: TopologyTestDriver should not require dummy parameters

2020-10-22 Thread GitBox


vvcephei commented on pull request #9477:
URL: https://github.com/apache/kafka/pull/9477#issuecomment-714550389


   Ok, here are the follow-up tickets I filed:
   * https://issues.apache.org/jira/browse/KAFKA-10628
   * https://issues.apache.org/jira/browse/KAFKA-10629
   * https://issues.apache.org/jira/browse/KAFKA-10630



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10630) State Directory config could be improved

2020-10-22 Thread John Roesler (Jira)
John Roesler created KAFKA-10630:


 Summary: State Directory config could be improved
 Key: KAFKA-10630
 URL: https://issues.apache.org/jira/browse/KAFKA-10630
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: John Roesler


During [https://github.com/apache/kafka/pull/9477,] I noticed that many tests 
wind up providing a state directory config purely to ensure a unique temp 
directory for the test. Since TopologyTestDriver and MockProcessorContext tests 
are typically unit tests, it would be more convenient to initialize those 
components with their own unique temp state directory, following the universal 
pattern from such tests:
{code:java}
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath()); {code}
Note that this literal setting is not ideal, since it actually creates a 
directory regardless of whether the application needs one. Instead, we should 
create a new TestUtil method to lazily generate a temp directory _name_ and 
then register a shutdown handler to delete it if it exists. Then, Streams would 
only create the directory if it actually needs persistence.

Also, the default value for that config is not platform independent. It is 
simply: {color:#067d17}"/tmp/kafka-streams"{color}. Perhaps instead we should 
set the default to something like "unset" or "" or "none". Then, instead of 
reading the property directly, when Streams actually needs the state directory, 
it could log a warning that the state directory config is not set and call the 
platform-independent Java api for creating a temporary directory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >