[GitHub] [kafka] highluck commented on a change in pull request #9105: MINOR: closable object Memory leak prevention

2020-08-03 Thread GitBox


highluck commented on a change in pull request #9105:
URL: https://github.com/apache/kafka/pull/9105#discussion_r464215307



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -197,8 +197,8 @@ synchronized boolean lock(final TaskId taskId) throws 
IOException {
 
 final FileChannel channel;
 
-try {
-channel = getOrCreateFileChannel(taskId, lockFile.toPath());
+try (final FileChannel fileChannel = getOrCreateFileChannel(taskId, 
lockFile.toPath())) {

Review comment:
   thanks for review 
   I was mistaken and wrong
   
   Since it is a reference, it does not seem to need to be closed separately.
   
   The code was updated





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on pull request #9105: MINOR: closable object Memory leak prevention

2020-08-03 Thread GitBox


highluck commented on pull request #9105:
URL: https://github.com/apache/kafka/pull/9105#issuecomment-667844194


   @abbccdda 
   
   Please review again thank you :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] albert02lowis commented on pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…

2020-08-03 Thread GitBox


albert02lowis commented on pull request #9108:
URL: https://github.com/apache/kafka/pull/9108#issuecomment-667850415


   Hi @abbccdda sure thing, I have added 2 more commits to this PR and the unit 
+ integration tests also looks good in my local



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on a change in pull request #9105: MINOR: closable object Memory leak prevention

2020-08-03 Thread GitBox


highluck commented on a change in pull request #9105:
URL: https://github.com/apache/kafka/pull/9105#discussion_r464215307



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -197,8 +197,8 @@ synchronized boolean lock(final TaskId taskId) throws 
IOException {
 
 final FileChannel channel;
 
-try {
-channel = getOrCreateFileChannel(taskId, lockFile.toPath());
+try (final FileChannel fileChannel = getOrCreateFileChannel(taskId, 
lockFile.toPath())) {

Review comment:
   thanks for review 
   I was mistaken and wrong
   
   The code was updated





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose

2020-08-03 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169776#comment-17169776
 ] 

Chia-Ping Tsai commented on KAFKA-7540:
---

[~hachikuji] Could I take over this issue? It is flaky on my Jenkins :(

> Flaky Test ConsumerBounceTest#testClose
> ---
>
> Key: KAFKA-7540
> URL: https://issues.apache.org/jira/browse/KAFKA-7540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: John Roesler
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.7.0, 2.6.1
>
>
> Observed on Java 8: 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]
>  
> Stacktrace:
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
>   at 
> kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
>   at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch

[GitHub] [kafka] highluck removed a comment on pull request #9105: MINOR: closable object Memory leak prevention

2020-08-03 Thread GitBox


highluck removed a comment on pull request #9105:
URL: https://github.com/apache/kafka/pull/9105#issuecomment-667844194


   @abbccdda 
   
   Please review again thank you :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on a change in pull request #9105: MINOR: closable object Memory leak prevention

2020-08-03 Thread GitBox


highluck commented on a change in pull request #9105:
URL: https://github.com/apache/kafka/pull/9105#discussion_r464215307



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -197,8 +197,8 @@ synchronized boolean lock(final TaskId taskId) throws 
IOException {
 
 final FileChannel channel;
 
-try {
-channel = getOrCreateFileChannel(taskId, lockFile.toPath());
+try (final FileChannel fileChannel = getOrCreateFileChannel(taskId, 
lockFile.toPath())) {

Review comment:
   thanks for review 
   I was mistaken and wrong
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck closed pull request #9105: MINOR: closable object Memory leak prevention

2020-08-03 Thread GitBox


highluck closed pull request #9105:
URL: https://github.com/apache/kafka/pull/9105


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on pull request #9105: MINOR: closable object Memory leak prevention

2020-08-03 Thread GitBox


highluck commented on pull request #9105:
URL: https://github.com/apache/kafka/pull/9105#issuecomment-667867206


   @abbccdda 
   thanks for review 
   I was mistaken and wrong
   
   close issue



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

2020-08-03 Thread GitBox


dajac commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r464241946



##
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##
@@ -34,6 +34,8 @@
 private final boolean expectResponse;
 private final int requestTimeoutMs;
 private final RequestCompletionHandler callback;
+private final String initialPrincipalName;
+private final String initialClientId;

Review comment:
   Could we use Optional for these two as they are not always provided?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Map> configs,
+   
 final boolean validateOnly) {
+return generateIncrementalRequestData(configs.keySet(), configs, 
validateOnly);
+}
+
+public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Collection resources,
+   
 final Map> configs,
+   
 final boolean validateOnly) {
+IncrementalAlterConfigsRequestData data = new 
IncrementalAlterConfigsRequestData()
+  
.setValidateOnly(validateOnly);
+for (ConfigResource resource : resources) {
+IncrementalAlterConfigsRequestData.AlterableConfigCollection 
alterableConfigSet =
+new 
IncrementalAlterConfigsRequestData.AlterableConfigCollection();
+for (AlterConfigOp configEntry : configs.get(resource))
+alterableConfigSet.add(new 
IncrementalAlterConfigsRequestData.AlterableConfig()
+   
.setName(configEntry.configEntry().name())
+   
.setValue(configEntry.configEntry().value())
+   
.setConfigOperation(configEntry.opType().id()));
+IncrementalAlterConfigsRequestData.AlterConfigsResource 
alterConfigsResource = new 
IncrementalAlterConfigsRequestData.AlterConfigsResource();
+alterConfigsResource.setResourceType(resource.type().id())
+
.setResourceName(resource.name()).setConfigs(alterableConfigSet);
+data.resources().add(alterConfigsResource);
+

Review comment:
   nit: empty line could be removed.

##
File path: clients/src/main/resources/common/message/RequestHeader.json
##
@@ -37,6 +37,12 @@
 // Since the client is sending the ApiVersionsRequest in order to discover 
what
 // versions are supported, the client does not know the best version to 
use.
 { "name": "ClientId", "type": "string", "versions": "1+", 
"nullableVersions": "1+", "ignorable": true,
-  "flexibleVersions": "none", "about": "The client ID string." }
+  "flexibleVersions": "none", "about": "The client ID string." },
+{ "name": "InitialPrincipalName", "type": "string", "tag": 0, 
"taggedVersions": "2+",
+  "nullableVersions": "2+", "default": "null", "ignorable": true,
+  "about": "Optional value of the initial principal name when the request 
is redirected by a broker, for audit logging purpose." },

Review comment:
   Actually, we will also use it for quota. I think that we could say that 
both `InitialPrincipalName` and `InitialClientId` will be used for logging and 
quota purposes.

##
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##
@@ -145,7 +147,9 @@ abstract class InterBrokerSendThread(name: String,
 
 case class RequestAndCompletionHandler(destination: Node,
   

[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-03 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Component/s: clients

> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> "org.apache.kafka" % "kafka-clients" % "2.1.0"
> I followed the documentation and I was expecting that transactions fail when 
> I call .commitTransaction if some problem is raised when sending a message 
> like it's described in the documentation: 
> [https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> ]
> Unfortunatelly, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
> - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling commitTransaction()
> - when the message is bigger than 1MB, the transaction is completed 
> successfully without the message being written. no exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-03 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Component/s: documentation

> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, documentation, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> "org.apache.kafka" % "kafka-clients" % "2.1.0"
> I followed the documentation and I was expecting that transactions fail when 
> I call .commitTransaction if some problem is raised when sending a message 
> like it's described in the documentation: 
> [https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> ]
> Unfortunatelly, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
> - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling commitTransaction()
> - when the message is bigger than 1MB, the transaction is completed 
> successfully without the message being written. no exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10334) Transactions not working properly

2020-08-03 Thread Luis Araujo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luis Araujo updated KAFKA-10334:

Component/s: (was: documentation)

> Transactions not working properly
> -
>
> Key: KAFKA-10334
> URL: https://issues.apache.org/jira/browse/KAFKA-10334
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.1.0, 2.3.0
>Reporter: Luis Araujo
>Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> "org.apache.kafka" % "kafka-clients" % "2.1.0"
> I followed the documentation and I was expecting that transactions fail when 
> I call .commitTransaction if some problem is raised when sending a message 
> like it's described in the documentation: 
> [https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> ]
> Unfortunatelly, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
> - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling commitTransaction()
> - when the message is bigger than 1MB, the transaction is completed 
> successfully without the message being written. no exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long

2020-08-03 Thread GitBox


rajinisivaram merged pull request #9092:
URL: https://github.com/apache/kafka/pull/9092


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long

2020-08-03 Thread GitBox


rajinisivaram commented on pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#issuecomment-667923327


   @dajac Thanks for the PR, test failures not related, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10338) Support PEM format for SSL certificates and private key

2020-08-03 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-10338:
--

 Summary: Support PEM format for SSL certificates and private key
 Key: KAFKA-10338
 URL: https://issues.apache.org/jira/browse/KAFKA-10338
 Project: Kafka
  Issue Type: New Feature
  Components: security
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram


We currently support only file-based JKS/PKCS12 format for SSL key stores and 
trust stores. It will be good to add support for PEM as configuration values 
that fits better with config externalization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] amitbhoraniya opened a new pull request #9113: Fix trogdor coordinator client

2020-08-03 Thread GitBox


amitbhoraniya opened a new pull request #9113:
URL: https://github.com/apache/kafka/pull/9113


   Trogdor coordinator client is not able to shutdown remote coordinator, so 
fixed that issue.
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2020-08-03 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-668034540


   Finally, it is finished! :congratulations: All features and tests are now 
successfully migrated into log4j2 API and passes clearly!
   
   I changed the PR title and preparing KIP. Stay tuned! :smiley:
   
   cc/ @ijuma @OneCricketeer @jeffhuang26 @tombentley @jpechane 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9110: MINOR: Ensure a reason is logged for every segment deletion

2020-08-03 Thread GitBox


kowshik commented on a change in pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#discussion_r464012842



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2227,14 +2210,17 @@ class Log(@volatile private var _dir: File,
* @param segments The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted 
asynchronously
*/
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment], 
asyncDelete: Boolean): Unit = {
+  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
+  asyncDelete: Boolean,
+  reason: SegmentDeletionReason): Unit = {
 if (segments.nonEmpty) {
   lock synchronized {
 // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
 // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
 // iteration remain valid and deterministic.
 val toDelete = segments.toList
 toDelete.foreach { segment =>
+  info(s"${reason.reasonString(this, segment)}")

Review comment:
   @dhruvilshah3 Sounds good!
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method

2020-08-03 Thread GitBox


abbccdda commented on pull request #9096:
URL: https://github.com/apache/kafka/pull/9096#issuecomment-668113921


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-08-03 Thread GitBox


junrao commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r463885952



##
File path: core/src/main/scala/kafka/log/LogSegment.scala
##
@@ -60,11 +60,34 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
-   val time: Time) extends Logging {
+   val time: Time,
+   val segmentRecovery: LogSegment => Int) extends 
Logging {
 
-  def offsetIndex: OffsetIndex = lazyOffsetIndex.get
+  def loadIndexWithRecovery[T <: AbstractIndex](lazyIndex: LazyIndex[T]): T = {

Review comment:
   Could this be private?

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -652,6 +653,12 @@ class Log(@volatile private var _dir: File,
 bytesTruncated
   }
 
+  def segmentRecovery(): LogSegment => Int = {
+(segment: LogSegment) => {
+  recoverSegment(segment, None)

Review comment:
   Log.recoverSegment() updates the producerState. If we are just 
rebuilding the indexes, we don't need to change producerState. Perhaps we could 
introduce a new method LogSegment.rebuildIndex() that does the same index 
rebuilding logic as LogSegment.recovery(), but without touching the 
producerState and leaderEpoch. If we encounter CorruptRecordException or 
InvalidRecordException, we reset the recovery point and fail the broker.

##
File path: core/src/main/scala/kafka/log/LazyIndex.scala
##
@@ -52,6 +52,13 @@ class LazyIndex[T <: AbstractIndex] private (@volatile 
private var indexWrapper:
 
   def file: File = indexWrapper.file
 
+  def isLoaded: Boolean = {
+indexWrapper match {
+  case indexValue: IndexValue[T] => true

Review comment:
   Since indexValue is not used, we could just do `case _: IndexValue[T]`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9110: MINOR: Ensure a reason is logged for all segment deletion operations

2020-08-03 Thread GitBox


hachikuji commented on a change in pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#discussion_r464545190



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2227,13 +2210,16 @@ class Log(@volatile private var _dir: File,
* @param segments The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted 
asynchronously
*/
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment], 
asyncDelete: Boolean): Unit = {
+  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
+  asyncDelete: Boolean,
+  reason: SegmentDeletionReason): Unit = {
 if (segments.nonEmpty) {
   lock synchronized {
 // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
 // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
 // iteration remain valid and deterministic.
 val toDelete = segments.toList
+info(s"${reason.reasonString(this, toDelete)}")

Review comment:
   A little annoying to need to pass through segments just to be added to 
each log message individually. Maybe we could do it like this instead
   ```scala
   info(s"Deleting segments due to ${reason.reasonString(this)}: $toDelete")
   ```

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2686,3 +2670,50 @@ object LogMetricNames {
 List(NumLogSegments, LogStartOffset, LogEndOffset, Size)
   }
 }
+
+sealed trait SegmentDeletionReason {
+  def reasonString(log: Log, toDelete: Iterable[LogSegment]): String
+}
+
+case object RetentionMsBreachDeletion extends SegmentDeletionReason {

Review comment:
   nit: is it necessary to add `Deletion` to all of these? Maybe only 
`LogDeletion` needs it since it is referring to deletion of the log itself.

##
File path: core/src/main/scala/kafka/log/LogSegment.scala
##
@@ -413,7 +413,7 @@ class LogSegment private[log] (val log: FileRecords,
   override def toString: String = "LogSegment(baseOffset=" + baseOffset +
 ", size=" + size +
 ", lastModifiedTime=" + lastModified +
-", largestTime=" + largestTimestamp +
+", largestRecordTimestamp=" + largestRecordTimestamp +

Review comment:
   I'm ok with the change. I think it's better to reflect the underlying 
fields directly and redundant information just adds noise to the logs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #9114: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

2020-08-03 Thread GitBox


dajac opened a new pull request #9114:
URL: https://github.com/apache/kafka/pull/9114


   Based on the discussion in https://github.com/apache/kafka/pull/9072, I have 
put together an alternative way. This one does the following:
   * Instead of changing the implementation of the Rate to behave like a Token 
Bucket, it actually use two different metrics: the regular Rate and a new Token 
Bucket. The latter is used to enforce the quota.
   
   The code can be improved and refactored. I just wanted to get out quickly to 
get feedback about the approach.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-03 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10339:
--

 Summary: MirrorMaker2 Exactly-once Semantics
 Key: KAFKA-10339
 URL: https://issues.apache.org/jira/browse/KAFKA-10339
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Ning Zhang
Assignee: Ning Zhang


MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
specifically the Source Connector / Task, which do not provide exactly-once 
semantics (EOS) out-of-the-box, as discussed in 
https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
https://github.com/apache/kafka/pull/5553, 
https://issues.apache.org/jira/browse/KAFKA-6080  and 
https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dhruvilshah3 commented on pull request #9110: MINOR: Ensure a reason is logged for all segment deletion operations

2020-08-03 Thread GitBox


dhruvilshah3 commented on pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#issuecomment-668151967


   Thanks for the reviews @kowshik, @hachikuji, @ijuma. I have addressed the 
comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-03 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10339:
---
Labels: needs-kip  (was: )

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dhruvilshah3 edited a comment on pull request #9110: MINOR: Ensure a reason is logged for all segment deletion operations

2020-08-03 Thread GitBox


dhruvilshah3 edited a comment on pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#issuecomment-668151967


   Thanks for the reviews @kowshik, @hachikuji, @ijuma. I have addressed the 
comments. Let me know what you think.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-03 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170231#comment-17170231
 ] 

Ning Zhang commented on KAFKA-10339:


[~ryannedolan] [~mimaison] Here is the KIP I would like to get your thoughts 
on, thanks so much for any feedback or input
https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+MirrorMaker2+Exactly-once+Semantics

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9110: MINOR: Ensure a reason is logged for all segment deletion operations

2020-08-03 Thread GitBox


ijuma commented on pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#issuecomment-668162389


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

2020-08-03 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r464567004



##
File path: clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
##
@@ -34,6 +34,8 @@
 private final boolean expectResponse;
 private final int requestTimeoutMs;
 private final RequestCompletionHandler callback;
+private final String initialPrincipalName;
+private final String initialClientId;

Review comment:
   It is not necessary as we don't check nulls for these fields.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2453,34 +2455,98 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
 val alterConfigsRequest = request.body[AlterConfigsRequest]
-val (authorizedResources, unauthorizedResources) = 
alterConfigsRequest.configs.asScala.toMap.partition { case (resource, _) =>
+val requestResources = alterConfigsRequest.configs.asScala.toMap
+
+val (authorizedResources, unauthorizedResources) = 
requestResources.partition { case (resource, _) =>
   resource.`type` match {
 case ConfigResource.Type.BROKER_LOGGER =>
-  throw new InvalidRequestException(s"AlterConfigs is deprecated and 
does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
+  throw new InvalidRequestException(
+s"AlterConfigs is deprecated and does not support the resource 
type ${ConfigResource.Type.BROKER_LOGGER}")
 case ConfigResource.Type.BROKER =>
   authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
 case ConfigResource.Type.TOPIC =>
   authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
 case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
   }
 }
-val authorizedResult = adminManager.alterConfigs(authorizedResources, 
alterConfigsRequest.validateOnly)
-val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-  resource -> configsAuthorizationApiError(resource)
+
+def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+  sendResponseMaybeThrottle(request, requestThrottleMs =>
+new AlterConfigsResponse(results.asJava, requestThrottleMs))
 }
-def responseCallback(requestThrottleMs: Int): AlterConfigsResponse = {
-  val data = new AlterConfigsResponseData()
-.setThrottleTimeMs(requestThrottleMs)
-  (authorizedResult ++ unauthorizedResult).foreach{ case (resource, error) 
=>
-data.responses().add(new AlterConfigsResourceResponse()
-  .setErrorCode(error.error.code)
-  .setErrorMessage(error.message)
-  .setResourceName(resource.name)
-  .setResourceType(resource.`type`.id))
+
+def notControllerResponse(): Unit = {
+  val errorResult = requestResources.keys.map {
+resource => resource -> new ApiError(Errors.NOT_CONTROLLER, null)
+  }.toMap
+
+  sendResponseCallback(errorResult)
+}
+
+if (isForwardingRequest(request)) {
+  if (!controller.isActive) {
+notControllerResponse()
+  } else {
+val authorizedResult = adminManager.alterConfigs(
+  authorizedResources, alterConfigsRequest.validateOnly)
+// For forwarding requests, the authentication failure is not caused by
+// the original client, but by the broker.
+val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+  resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null)
+}
+
+sendResponseCallback(authorizedResult ++ unauthorizedResult)
+  }
+} else if (!controller.isActive && config.redirectionEnabled) {
+  val redirectRequestBuilder = new AlterConfigsRequest.Builder(
+authorizedResources.asJava, alterConfigsRequest.validateOnly())
+
+  brokerToControllerChannelManager.sendRequest(redirectRequestBuilder,
+new ForwardedAlterConfigsRequestCompletionHandler(request,
+  unauthorizedResources.keys.map { resource =>
+resource -> configsAuthorizationApiError(resource)
+  }.toMap),
+request.header.initialPrincipalName,
+request.header.initialClientId)
+} else {
+  // When IBP is low, we would just handle the config request, as admin 
client doesn't know
+  // how to find the controller.
+  val authorizedResult = adminManager.alterConfigs(
+authorizedResources, alterConfigsRequest.validateOnly)
+  val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+resource -> configsAuthorizationApiError(resource)
   }
-  new AlterConfigsResponse(data)
+
+  sendResponseCallback(authorizedResult ++ unauthorizedResult)
+}
+  }
+
+  private def isForwardingRequest(request: RequestChannel.Request): Boolean = {
+request.header.initialPrincipalNa

[jira] [Created] (KAFKA-10340) Source connectors should report error when trying to producer records to non-existent topics instead of hanging forever

2020-08-03 Thread Arjun Satish (Jira)
Arjun Satish created KAFKA-10340:


 Summary: Source connectors should report error when trying to 
producer records to non-existent topics instead of hanging forever
 Key: KAFKA-10340
 URL: https://issues.apache.org/jira/browse/KAFKA-10340
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Arjun Satish


Currently, a source connector will blindly attempt to write a record to a Kafka 
topic. When the topic does not exist, its creation is controlled by ACLs and 
the {{auto.create.topics.enable}} config on the brokers. When auto.create is 
disabled, the producer.send() call on the Connect worker will hang indefinitely 
(due to the "infinite retries" configuration for said producer). In production 
setups, the config is usually disabled, and the source connector simply appears 
to hang and not produce any output. 

It is desirable to either log an info or an error message (or inform the user 
somehow) that the connector is simply stuck waiting for the destination topic 
to be created. When the worker has permissions to inspect the broker settings, 
it can use the {{listTopics}} and {{describeConfigs}} API in AdminClient to 
check if the topic exists, the broker can {{auto.create.topics.enable}} topics, 
and if these cases do not exist, either throw an error.

With the recently merged 
[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
 this becomes even more specific a corner case: when topic creation settings 
are enabled, the worker should handle the corner case where topic creation is 
disabled, {{auto.create.topics.enable}} is set to false and topic does not 
exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to producer records to non-existent topics instead of hanging forever

2020-08-03 Thread Arjun Satish (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish updated KAFKA-10340:
-
Description: 
Currently, a source connector will blindly attempt to write a record to a Kafka 
topic. When the topic does not exist, its creation is controlled by the 
{{auto.create.topics.enable}} config on the brokers. When auto.create is 
disabled, the producer.send() call on the Connect worker will hang indefinitely 
(due to the "infinite retries" configuration for said producer). In production 
setups, where this config is usually disabled, the source connector simply 
appears to hang and not produce any output.

It is desirable to either log an info or an error message (or inform the user 
somehow) that the connector is simply stuck waiting for the destination topic 
to be created. When the worker has permissions to inspect the broker settings, 
it can use the {{listTopics}} and {{describeConfigs}} API in AdminClient to 
check if the topic exists, the broker can {{auto.create.topics.enable}} topics, 
and if these cases do not exist, either throw an error.

With the recently merged 
[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
 this becomes even more specific a corner case: when topic creation settings 
are enabled, the worker should handle the corner case where topic creation is 
disabled, {{auto.create.topics.enable}} is set to false and topic does not 
exist.

  was:
Currently, a source connector will blindly attempt to write a record to a Kafka 
topic. When the topic does not exist, its creation is controlled by ACLs and 
the {{auto.create.topics.enable}} config on the brokers. When auto.create is 
disabled, the producer.send() call on the Connect worker will hang indefinitely 
(due to the "infinite retries" configuration for said producer). In production 
setups, the config is usually disabled, and the source connector simply appears 
to hang and not produce any output. 

It is desirable to either log an info or an error message (or inform the user 
somehow) that the connector is simply stuck waiting for the destination topic 
to be created. When the worker has permissions to inspect the broker settings, 
it can use the {{listTopics}} and {{describeConfigs}} API in AdminClient to 
check if the topic exists, the broker can {{auto.create.topics.enable}} topics, 
and if these cases do not exist, either throw an error.

With the recently merged 
[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
 this becomes even more specific a corner case: when topic creation settings 
are enabled, the worker should handle the corner case where topic creation is 
disabled, {{auto.create.topics.enable}} is set to false and topic does not 
exist.


> Source connectors should report error when trying to producer records to 
> non-existent topics instead of hanging forever
> ---
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Arjun Satish
>Priority: Major
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In production setups, where this config is usually disabled, the source 
> connector simply appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atla

[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to producer records to non-existent topics instead of hanging forever

2020-08-03 Thread Arjun Satish (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish updated KAFKA-10340:
-
Description: 
Currently, a source connector will blindly attempt to write a record to a Kafka 
topic. When the topic does not exist, its creation is controlled by the 
{{auto.create.topics.enable}} config on the brokers. When auto.create is 
disabled, the producer.send() call on the Connect worker will hang indefinitely 
(due to the "infinite retries" configuration for said producer). In setups 
where this config is usually disabled, the source connector simply appears to 
hang and not produce any output.

It is desirable to either log an info or an error message (or inform the user 
somehow) that the connector is simply stuck waiting for the destination topic 
to be created. When the worker has permissions to inspect the broker settings, 
it can use the {{listTopics}} and {{describeConfigs}} API in AdminClient to 
check if the topic exists, the broker can {{auto.create.topics.enable}} topics, 
and if these cases do not exist, either throw an error.

With the recently merged 
[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
 this becomes even more specific a corner case: when topic creation settings 
are enabled, the worker should handle the corner case where topic creation is 
disabled, {{auto.create.topics.enable}} is set to false and topic does not 
exist.

  was:
Currently, a source connector will blindly attempt to write a record to a Kafka 
topic. When the topic does not exist, its creation is controlled by the 
{{auto.create.topics.enable}} config on the brokers. When auto.create is 
disabled, the producer.send() call on the Connect worker will hang indefinitely 
(due to the "infinite retries" configuration for said producer). In production 
setups, where this config is usually disabled, the source connector simply 
appears to hang and not produce any output.

It is desirable to either log an info or an error message (or inform the user 
somehow) that the connector is simply stuck waiting for the destination topic 
to be created. When the worker has permissions to inspect the broker settings, 
it can use the {{listTopics}} and {{describeConfigs}} API in AdminClient to 
check if the topic exists, the broker can {{auto.create.topics.enable}} topics, 
and if these cases do not exist, either throw an error.

With the recently merged 
[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
 this becomes even more specific a corner case: when topic creation settings 
are enabled, the worker should handle the corner case where topic creation is 
disabled, {{auto.create.topics.enable}} is set to false and topic does not 
exist.


> Source connectors should report error when trying to producer records to 
> non-existent topics instead of hanging forever
> ---
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Arjun Satish
>Priority: Major
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method

2020-08-03 Thread GitBox


abbccdda commented on pull request #9096:
URL: https://github.com/apache/kafka/pull/9096#issuecomment-668181513


   Only flaky test failures.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method

2020-08-03 Thread GitBox


abbccdda merged pull request #9096:
URL: https://github.com/apache/kafka/pull/9096


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2020-08-03 Thread Arjun Satish (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish updated KAFKA-10340:
-
Summary: Source connectors should report error when trying to produce 
records to non-existent topics instead of hanging forever  (was: Source 
connectors should report error when trying to producer records to non-existent 
topics instead of hanging forever)

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Arjun Satish
>Priority: Major
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch opened a new pull request #9115: Bump dev version to 2.6.1-SNAPSHOT

2020-08-03 Thread GitBox


rhauch opened a new pull request #9115:
URL: https://github.com/apache/kafka/pull/9115


   As part of the 2.6.0 release, we need to bump the dev version of the `2.6` 
branch to 2.6.1.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-03 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170377#comment-17170377
 ] 

Ryanne Dolan commented on KAFKA-10339:
--

[~yangguo1220] this is awesome, thanks! My team has looked into this multiple 
times, and we came to the same conclusion around WorkerSourceTask requiring a 
lot of changes to support transactions. I believe it would be easier to start 
from scratch with a new SourceWorker than to adapt it to support transactions, 
and we'd probably need to deprecate a number of APIs in the process. I wouldn't 
rule it out, but it would be difficult.

Love how your KIP "kills two birds with one stone" -- we get a 
MirrorSinkConnector _and_ EOS.



> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-03 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170409#comment-17170409
 ] 

Ning Zhang commented on KAFKA-10339:


Thanks [~ryannedolan] for your quick feedback :) if MirrorSinkConnector and 
transactional producer in MirrorSinkTask sounds the right path for you 
initially, I will spend more times this week on refining the above KIP, 
meanwhile waiting for the initial feedback from [~mimaison]. 

Also the migration may be interesting if MirrorSinkConnector is the right path 
to go.

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10339) MirrorMaker2 Exactly-once Semantics

2020-08-03 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170425#comment-17170425
 ] 

Ryanne Dolan commented on KAFKA-10339:
--

Yeah, I guess it would be possible to switch from MirrorSourceConnector to 
MirrorSinkConnector in the connect-mirror-maker "driver", but external tooling 
would notice (e.g. JMX metrics would change), so we'd need to put that behind a 
flag or something so users could opt-in to the SinkConnector in order to get 
EOS. But even without changing the "driver', an EOS MirrorSinkConnector would 
be very useful to many organizations that run MM2 on existing Connect clusters.

> MirrorMaker2 Exactly-once Semantics
> ---
>
> Key: KAFKA-10339
> URL: https://issues.apache.org/jira/browse/KAFKA-10339
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
>  Labels: needs-kip
>
> MirrorMaker2 is currently implemented on Kafka Connect Framework, more 
> specifically the Source Connector / Task, which do not provide exactly-once 
> semantics (EOS) out-of-the-box, as discussed in 
> https://github.com/confluentinc/kafka-connect-jdbc/issues/461,  
> https://github.com/apache/kafka/pull/5553, 
> https://issues.apache.org/jira/browse/KAFKA-6080  and 
> https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 
> currently does not provide EOS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10316) Consider renaming getter method for Interactive Queries

2020-08-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170429#comment-17170429
 ] 

Matthias J. Sax commented on KAFKA-10316:
-

Correct. For a KIP you need 3 binding +1 votes. I just voted myself and hope 
that [~guozhang] or [~bchen225242] could also vote to get up to 3.

> Consider renaming getter method for Interactive Queries
> ---
>
> Key: KAFKA-10316
> URL: https://issues.apache.org/jira/browse/KAFKA-10316
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: John Thomas
>Priority: Minor
>  Labels: beginner, need-kip, newbie
>
> In the 2.5 release, we introduce new classes for Interactive Queries via 
> KIP-535 (cf https://issues.apache.org/jira/browse/KAFKA-6144). The KIP did 
> not specify the names for getter methods of `KeyQueryMetadata` explicitly and 
> they were added in the PR as `getActiveHost()`, `getStandbyHosts()`, and 
> `getPartition()`.
> However, in Kafka it is custom to not use the `get` prefix for getters and 
> thus the methods should have been added as `activeHost()`, `standbyHosts()`, 
> and `partition()`, respectively.
> We should consider renaming the methods accordingly, by deprecating the 
> existing ones and adding the new ones in parallel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10321) shouldDieOnInvalidOffsetExceptionDuringStartup would block forever on JDK11

2020-08-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10321:

Component/s: streams

> shouldDieOnInvalidOffsetExceptionDuringStartup would block forever on JDK11
> ---
>
> Key: KAFKA-10321
> URL: https://issues.apache.org/jira/browse/KAFKA-10321
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Boyang Chen
>Priority: Major
>
> Have spotted two definite cases where the test 
> shouldDieOnInvalidOffsetExceptionDuringStartup
> fails to stop during the whole test suite:
>  [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7604/consoleFull]
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7602/console]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10321) shouldDieOnInvalidOffsetExceptionDuringStartup would block forever on JDK11

2020-08-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10321.
-
Fix Version/s: 2.7.0
   Resolution: Fixed

> shouldDieOnInvalidOffsetExceptionDuringStartup would block forever on JDK11
> ---
>
> Key: KAFKA-10321
> URL: https://issues.apache.org/jira/browse/KAFKA-10321
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>
> Have spotted two definite cases where the test 
> shouldDieOnInvalidOffsetExceptionDuringStartup
> fails to stop during the whole test suite:
>  [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7604/consoleFull]
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7602/console]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-08-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10255:

Fix Version/s: (was: 2.7)
   2.7.0

> Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
> 
>
> Key: KAFKA-10255
> URL: https://issues.apache.org/jira/browse/KAFKA-10255
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.7.0
>
>
> https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 STARTED
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 FAILED
>  java.lang.AssertionError: consumer record size is not zero expected:<0> but 
> was:<2>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10120) DescribeLogDirsResult exposes internal classes

2020-08-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10120:

Fix Version/s: (was: 2.7)
   2.7.0

> DescribeLogDirsResult exposes internal classes
> --
>
> Key: KAFKA-10120
> URL: https://issues.apache.org/jira/browse/KAFKA-10120
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
> Fix For: 2.7.0
>
>
> DescribeLogDirsResult (returned by AdminClient#describeLogDirs(Collection)) 
> exposes a number of internal types:
>  * {{DescribeLogDirsResponse.LogDirInfo}}
>  * {{DescribeLogDirsResponse.ReplicaInfo}}
>  * {{Errors}}
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-08-03 Thread GitBox


ableegoldman commented on pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#issuecomment-668252969


   cc @guozhangwang for 2nd review & merge



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG

2020-08-03 Thread GitBox


mjsax commented on pull request #9024:
URL: https://github.com/apache/kafka/pull/9024#issuecomment-668255117


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG

2020-08-03 Thread GitBox


guozhangwang commented on pull request #9024:
URL: https://github.com/apache/kafka/pull/9024#issuecomment-668258427


   SG. +1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-08-03 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r464683179



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -461,6 +463,42 @@ public void flush() {
 }
 }
 
+public void flushCache() {
+RuntimeException firstException = null;
+// attempting to flush the stores
+if (!stores.isEmpty()) {
+log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+for (final StateStoreMetadata metadata : stores.values()) {
+final StateStore store = metadata.stateStore;
+
+try {
+// buffer should be flushed to send all records to 
changelog
+if (store instanceof TimeOrderedKeyValueBuffer) {
+store.flush();
+} else if (store instanceof CachedStateStore) {
+((CachedStateStore) store).flushCache();
+}

Review comment:
   For stores that's not time-ordered or cached, we should not flush them 
indeed. In fact moving forward I think we would not flush cache store anyways 
since they will be removed. I.e. generally speaking we should not `flush cache` 
always. In that sense the log4j entry looks reasonable to me?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-08-03 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r464683987



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
 private StateManagerUtil() {}
 
 static RecordConverter converterForStore(final StateStore store) {
 return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
 }
 
+static boolean checkpointNeeded(final boolean enforceCheckpoint,
+final Map 
oldOffsetSnapshot,
+final Map 
newOffsetSnapshot) {
+// we should always have the old snapshot post completing the register 
state stores;
+// if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+if (oldOffsetSnapshot == null)
+return false;
+
+// if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+// note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+// and hence it's okay to not checkpoint
+if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+return true;
+
+// we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+long totalOffsetDelta = 0L;
+for (final Map.Entry entry : 
newOffsetSnapshot.entrySet()) {
+totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());

Review comment:
   Was trying to make sure we do not get an NPE but I think that's not 
necessary.. will change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10341) Add version 2.6 to streams and systems tests

2020-08-03 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-10341:
-

 Summary: Add version 2.6 to streams and systems tests
 Key: KAFKA-10341
 URL: https://issues.apache.org/jira/browse/KAFKA-10341
 Project: Kafka
  Issue Type: Task
  Components: build, streams, system tests
Affects Versions: 2.7.0
Reporter: Randall Hauch
Assignee: Randall Hauch


Part of the [2.6.0 release 
process|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-AnnouncetheRC].

See KAFKA-9779 for the changes made for the 2.5 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-08-03 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r464684660



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -267,80 +266,26 @@ public void handleAssignment(final Map> activeTasks,
 // check for tasks that were owned previously but have changed 
active/standby status
 tasksToRecycle.add(task);
 } else {
-tasksToClose.add(task);
-}
-}
-
-for (final Task task : tasksToClose) {
-try {
-if (task.isActive()) {
-// Active tasks are revoked and suspended/committed during 
#handleRevocation
-if (!task.state().equals(State.SUSPENDED)) {
-log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-  task.id(), task.state());
-throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
-}
-} else {
-task.suspend();
-task.prepareCommit();
-task.postCommit();
-}
-completeTaskCloseClean(task);
-cleanUpTaskProducer(task, taskCloseExceptions);
-tasks.remove(task.id());
-} catch (final RuntimeException e) {
-final String uncleanMessage = String.format(
-"Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-task.id());
-log.error(uncleanMessage, e);
-taskCloseExceptions.put(task.id(), e);
-// We've already recorded the exception (which is the point of 
clean).
-// Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
-dirtyTasks.add(task);
+tasksToCloseClean.add(task);
 }
 }
 
-for (final Task oldTask : tasksToRecycle) {
-final Task newTask;
-try {
-if (oldTask.isActive()) {
-if (!oldTask.state().equals(State.SUSPENDED)) {
-// Active tasks are revoked and suspended/committed 
during #handleRevocation
-log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-  oldTask.id(), oldTask.state());
-throw new IllegalStateException("Active task " + 
oldTask.id() + " should have been suspended");
-}
-final Set partitions = 
standbyTasksToCreate.remove(oldTask.id());
-newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
-cleanUpTaskProducer(oldTask, taskCloseExceptions);
-} else {
-oldTask.suspend();
-oldTask.prepareCommit();
-oldTask.postCommit();
-final Set partitions = 
activeTasksToCreate.remove(oldTask.id());
-newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
-}
-tasks.remove(oldTask.id());
-addNewTask(newTask);
-} catch (final RuntimeException e) {
-final String uncleanMessage = String.format("Failed to recycle 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
oldTask.id());
-log.error(uncleanMessage, e);
-taskCloseExceptions.put(oldTask.id(), e);
-dirtyTasks.add(oldTask);
-}
-}
+// close and recycle those tasks
+handleCloseAndRecycle(tasksToRecycle, tasksToCloseClean, 
tasksToCloseDirty, activeTasksToCreate, standbyTasksToCreate, 
taskCloseExceptions);
 
-for (final Task task : dirtyTasks) {
+// for tasks that cannot be cleanly closed or recycled, close them 
dirty
+for (final Task task : tasksToCloseDirty) {

Review comment:
   That's a good point, I will update.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch opened a new pull request #9116: KAFKA-10341: Add 2.6.0 to system tests and streams upgrade tests

2020-08-03 Thread GitBox


rhauch opened a new pull request #9116:
URL: https://github.com/apache/kafka/pull/9116


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9116: KAFKA-10341: Add 2.6.0 to system tests and streams upgrade tests

2020-08-03 Thread GitBox


rhauch commented on pull request #9116:
URL: https://github.com/apache/kafka/pull/9116#issuecomment-668281025


   Just kicked off a build of the Streams system tests (e.g., 
`TC_PATHS=tests/kafkatest/tests/streams/streams_upgrade_test.py`) using this 
PR: 
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4082/
   
   and a full system test run using this PR: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4083/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10341) Add version 2.6 to streams and systems tests

2020-08-03 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10341:
--
Fix Version/s: 2.7.0

> Add version 2.6 to streams and systems tests
> 
>
> Key: KAFKA-10341
> URL: https://issues.apache.org/jira/browse/KAFKA-10341
> Project: Kafka
>  Issue Type: Task
>  Components: build, streams, system tests
>Affects Versions: 2.7.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
> Fix For: 2.7.0
>
>
> Part of the [2.6.0 release 
> process|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-AnnouncetheRC].
> See KAFKA-9779 for the changes made for the 2.5 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10341) Add version 2.6 to streams and systems tests

2020-08-03 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10341:
--
Description: 
Part of the [2.6.0 release 
process|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-AnnouncetheRC].
 This will be merged only to `trunk` for inclusion in 2.7.0

See KAFKA-9779 for the changes made for the 2.5 release.

  was:
Part of the [2.6.0 release 
process|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-AnnouncetheRC].

See KAFKA-9779 for the changes made for the 2.5 release.


> Add version 2.6 to streams and systems tests
> 
>
> Key: KAFKA-10341
> URL: https://issues.apache.org/jira/browse/KAFKA-10341
> Project: Kafka
>  Issue Type: Task
>  Components: build, streams, system tests
>Affects Versions: 2.7.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
> Fix For: 2.7.0
>
>
> Part of the [2.6.0 release 
> process|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-AnnouncetheRC].
>  This will be merged only to `trunk` for inclusion in 2.7.0
> See KAFKA-9779 for the changes made for the 2.5 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-03 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464705276



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2956,6 +2956,22 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
+val alterIsrRequest = request.body[AlterIsrRequest]
+
+authorizeClusterOperation(request, CLUSTER_ACTION);
+
+// TODO do we need throttling for this response?

Review comment:
   Probably reasonable to handle it the same way other inter-broker RPCs 
are handled.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?

Review comment:
   Good question. Might be fair to assume the controller is correct and use 
STALE_BROKER_EPOCH. Once kip-500 is all done, it would be totally fair since 
the controller will be guaranteed to have the latest state. The other question 
is what the broker should do if it sees STALE_BROKER_EPOCH...

##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,121 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates += alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+}
+  }
+
+  override def startup(): Unit = {
+scheduler.schedule("alter-isr-send", maybePropagateIsrChanges _, period = 
2500L, unit = TimeUnit.MILLISECONDS)

Review comment:
   Hmm.. This adds a delay of 2.5s to every ISR change, which is a bit 
annoying. I guess the point is to allow batching? I think a better approach 
might be to send requests immediately on arrival, but set a limit on the 
maximum number of in-flight requests (maybe just 1) and let the changes 
accumulate when there is a request in-flight. Then we can still get a big 
batching benefit when there are a large number of ISR changes that need to be 
sent in a hurry.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-03 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464716040



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,121 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L
+  val IsrChangePropagationInterval = 6L
+
+  def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit
+
+  def startup(): Unit
+
+  def shutdown(): Unit
+}
+
+case class AlterIsrItem(topicPartition: TopicPartition, leaderAndIsr: 
LeaderAndIsr, callback: Errors => Unit)
+
+class AlterIsrChannelManagerImpl(val controllerChannelManager: 
BrokerToControllerChannelManager,
+ val zkClient: KafkaZkClient,
+ val scheduler: Scheduler,
+ val brokerId: Int,
+ val brokerEpoch: Long) extends 
AlterIsrChannelManager with Logging with KafkaMetricsGroup {
+
+  private val pendingIsrUpdates: mutable.Queue[AlterIsrItem] = new 
mutable.Queue[AlterIsrItem]()
+  private val lastIsrChangeMs = new AtomicLong(0)
+  private val lastIsrPropagationMs = new AtomicLong(0)
+
+  override def enqueueIsrUpdate(alterIsrItem: AlterIsrItem): Unit = {
+pendingIsrUpdates synchronized {
+  pendingIsrUpdates += alterIsrItem
+  lastIsrChangeMs.set(System.currentTimeMillis())
+}
+  }
+
+  override def startup(): Unit = {
+scheduler.schedule("alter-isr-send", maybePropagateIsrChanges _, period = 
2500L, unit = TimeUnit.MILLISECONDS)

Review comment:
   That makes sense. I'll change that (this was pulled in from the previous 
ISR notification code in ReplicaManager)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-03 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464717171



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+  return
+}
+
+if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+  info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+  return
+}*/
+
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+val resp = new AlterIsrResponseData()
+resp.setTopics(new util.ArrayList())
+
+alterIsrRequest.topics().forEach(topicReq => {
+  val topicResp = new AlterIsrResponseTopics()
+.setName(topicReq.name())
+.setPartitions(new util.ArrayList())
+  resp.topics().add(topicResp)
+
+  topicReq.partitions().forEach(partitionReq => {
+val partitionResp = new AlterIsrResponsePartitions()
+  .setPartitionIndex(partitionReq.partitionIndex())
+topicResp.partitions().add(partitionResp)
+
+// For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
   The main rationale for validating in the request handler is so we can 
return meaningful partition-level errors to the broker (fenced leader, not 
leader or follower, etc). Although, I'm not sure the broker could do anything 
useful with these errors since it probably has stale metadata in these cases.
   
   The KIP calls out four partition-level errors. Do we actually need them?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-03 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r464697840



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1240,26 +1298,30 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: 
Boolean): Unit = {
+val groupsToComplete = scala.collection.mutable.Set[GroupKey]()
 group.inLock {
   if (group.is(Dead)) {
 info(s"Received notification of heartbeat expiration for member 
$memberId after group ${group.groupId} had already been unloaded or deleted.")
   } else if (isPending) {
 info(s"Pending member $memberId in group ${group.groupId} has been 
removed after session timeout expiration.")
-removePendingMemberAndUpdateGroup(group, memberId)
+groupsToComplete ++= removePendingMemberAndUpdateGroup(group, memberId)
   } else if (!group.has(memberId)) {
 debug(s"Member $memberId has already been removed from the group.")
   } else {
 val member = group.get(memberId)
 if (!member.hasSatisfiedHeartbeat) {
   info(s"Member ${member.memberId} in group ${group.groupId} has 
failed, removing it from the group")
-  removeMemberAndUpdateGroup(group, member, s"removing member 
${member.memberId} on heartbeat expiration")
+  groupsToComplete ++= removeMemberAndUpdateGroup(group, member, 
s"removing member ${member.memberId} on heartbeat expiration")
 }
   }
 }
+groupsToComplete.foreach(joinPurgatory.checkAndComplete)

Review comment:
   This could be `completeDelayedJoinRequests(groupsToComplete)` ?

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -554,19 +596,27 @@ class ReplicaManager(val config: KafkaConfig,
* Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
* the callback function will be triggered either when timeout or the 
required acks are satisfied;
* if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * @param completeDelayedRequests true: the delayed requests may be 
completed inside the call with the expectation
+   *that no conflicting locks are held by the 
caller. Otherwise, the caller is expected
+   *to complete delayed requests for those 
returned partitions if completeDelayedRequests
+   *is false

Review comment:
   Could we change the explanation to sth like the following?
   
   This method may trigger the completeness check for delayed requests in a few 
purgatories. Occasionally, for serialization in the log, a caller may need to 
hold a lock while calling this method. To avoid deadlock, if the caller holds a 
conflicting lock while calling this method, the caller is expected to set 
completeDelayedRequests to false to avoid checking the delayed operations 
during this call. The caller will then explicitly complete those delayed 
operations based on the return value, without holding the conflicting lock. 

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -334,6 +345,37 @@ class ReplicaManager(val config: KafkaConfig,
   brokerTopicStats.removeMetrics(topic)
   }
 
+  /**
+   * try to complete delayed requests in following purgatories.
+   * 1) delayedFetchPurgatory
+   * 2) delayedProducePurgatory
+   * 3) delayedDeleteRecordsPurgatory
+   *
+   * Noted that caller should NOT hold any group lock in order to avoid 
deadlock.
+   *
+   * this method is visible for testing.
+   */
+  def completeDelayedRequests(topicPartition: TopicPartition): Unit = {
+val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
+delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
+delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
+delayedDeleteRecordsPurgatory.checkAndComplete(topicPartitionOperationKey)
+  }
+
+  /**
+   * try to complete delayed requests in delayedFetchPurgatory.
+   *
+   * Noted that caller should NOT hold any group lock in order to avoid 
deadlock.

Review comment:
   group lock => conflicting lock

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -334,6 +345,37 @@ class ReplicaManager(val config: KafkaConfig,
   brokerTopicStats.removeMetrics(topic)
   }
 
+  /**
+   * try to complete delayed requests in following purgatories.
+   * 1) delayedFetchPurgatory
+   * 2) delayedProducePurgatory
+   * 3) delayedDeleteRecordsPurgatory
+   *
+   * Noted that caller should NOT hold any group lock in order to avoid 
deadlock.

Review comment:
   group lock => conflicting lock

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoord

[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-03 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r464718819



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1771,6 +1775,127 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  // TODO is it okay to pull message classes down into the controller?
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+//val brokerEpochOpt = 
controllerContext.liveBrokerIdAndEpochs.get(alterIsrRequest.brokerId())
+/*if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker 
${alterIsrRequest.brokerId()}")
+  // TODO is INVALID_REQUEST a reasonable error here?
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.INVALID_REQUEST.code))
+  return
+}
+
+if (!brokerEpochOpt.contains(alterIsrRequest.brokerEpoch())) {
+  info(s"Ignoring AlterIsr due to stale broker epoch 
${alterIsrRequest.brokerEpoch()} for broker ${alterIsrRequest.brokerId()}")
+  callback.apply(new 
AlterIsrResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))
+  return
+}*/
+
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+val resp = new AlterIsrResponseData()
+resp.setTopics(new util.ArrayList())
+
+alterIsrRequest.topics().forEach(topicReq => {
+  val topicResp = new AlterIsrResponseTopics()
+.setName(topicReq.name())
+.setPartitions(new util.ArrayList())
+  resp.topics().add(topicResp)
+
+  topicReq.partitions().forEach(partitionReq => {
+val partitionResp = new AlterIsrResponsePartitions()
+  .setPartitionIndex(partitionReq.partitionIndex())
+topicResp.partitions().add(partitionResp)
+
+// For each partition who's ISR we are altering, let's do some upfront 
validation for the broker response

Review comment:
   To be clear, I'm not questioning the need for the validation, just the 
fact that it is done before enqueueing the event instead of when the event is 
processed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #5374: MINOR: update release.py

2020-08-03 Thread GitBox


mjsax commented on a change in pull request #5374:
URL: https://github.com/apache/kafka/pull/5374#discussion_r464727207



##
File path: release.py
##
@@ -479,16 +479,16 @@ def select_gpg_key():
 for root, dirs, files in os.walk(artifacts_dir):
 assert root.startswith(artifacts_dir)
 
-for file in files:
-local_path = os.path.join(root, file)
-remote_path = os.path.join("public_html", kafka_output_dir, 
root[len(artifacts_dir)+1:], file)
-sftp_cmds += "\nput %s %s" % (local_path, remote_path)
-
 for dir in dirs:
 sftp_mkdir(os.path.join("public_html", kafka_output_dir, 
root[len(artifacts_dir)+1:], dir))
 
-if sftp_cmds:
-cmd("Uploading artifacts in %s to your Apache home directory" % root, 
"sftp -b - %s...@home.apache.org" % apache_id, stdin=sftp_cmds)
+for file in files:
+local_path = os.path.join(root, file)
+remote_path = os.path.join("public_html", kafka_output_dir, 
root[len(artifacts_dir)+1:], file)
+sftp_cmds = """
+put %s %s
+""" % (local_path, remote_path)
+cmd("Uploading artifacts in %s to your Apache home directory" % root, 
"sftp -b - %s...@home.apache.org" % apache_id, stdin=sftp_cmds)

Review comment:
   It was always slow. This PR only made the script print stuff to stdout 
regularly--before this PR, the script was uploading silently for a lng time 
and one could think it froze (even if it was just doing/uploading stuff). This 
PR was not to make the upload faster, just to make the script appear alive :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-08-03 Thread GitBox


guozhangwang commented on pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#issuecomment-668301971


   cc @mjsax please take a final look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: KIP-418 implementation

2020-08-03 Thread GitBox


mjsax commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-668302357


   I'll put it into my backlog. But I am the main reviewer for two other KIPs 
(216 and 466) that I should review first as they got approve earlier and PRs 
are open for longer already.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-03 Thread GitBox


mjsax commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r464728426



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
##
@@ -19,7 +19,16 @@
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.*;

Review comment:
   The build should actually fail on wildcard imports... Do we have some 
checkstyle gaps? @lct45 can you maybe look into that (if not, also ok).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-03 Thread GitBox


mjsax commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r464728901



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/CogroupedKStream.java
##
@@ -275,6 +275,15 @@
  */
  TimeWindowedCogroupedKStream windowedBy(final 
Windows windows);
 
+/**
+ * Create a new {@link TimeWindowedCogroupedKStream} instance that can be 
used to perform sliding
+ * windowed aggregations.
+ *
+ * @param windows the specification of the aggregation {@link 
SlidingWindows}
+ * @return an instance of {@link TimeWindowedCogroupedKStream}
+ */
+TimeWindowedCogroupedKStream windowedBy(final SlidingWindows 
windows);

Review comment:
   This PR is rather larger. Would it maybe make sense to split it into 2 
and add co-group in it's own PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Johnny-Malizia commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-08-03 Thread GitBox


Johnny-Malizia commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r464731245



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -652,6 +653,12 @@ class Log(@volatile private var _dir: File,
 bytesTruncated
   }
 
+  def segmentRecovery(): LogSegment => Int = {
+(segment: LogSegment) => {
+  recoverSegment(segment, None)

Review comment:
   This sounds reasonable to me, what do I need to do to ensure the 
recovery point is reset and the broker is failed?

##
File path: core/src/main/scala/kafka/log/LazyIndex.scala
##
@@ -52,6 +52,13 @@ class LazyIndex[T <: AbstractIndex] private (@volatile 
private var indexWrapper:
 
   def file: File = indexWrapper.file
 
+  def isLoaded: Boolean = {
+indexWrapper match {
+  case indexValue: IndexValue[T] => true

Review comment:
   :+1: 

##
File path: core/src/main/scala/kafka/log/LogSegment.scala
##
@@ -60,11 +60,34 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
-   val time: Time) extends Logging {
+   val time: Time,
+   val segmentRecovery: LogSegment => Int) extends 
Logging {
 
-  def offsetIndex: OffsetIndex = lazyOffsetIndex.get
+  def loadIndexWithRecovery[T <: AbstractIndex](lazyIndex: LazyIndex[T]): T = {

Review comment:
   :+1: 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9114: KAFKA-10162; Use Token Bucket algorithm for controller mutation quota (KIP-599, Part III)

2020-08-03 Thread GitBox


junrao commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r464724281



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+private final TimeUnit unit;
+private double credits;
+private long lastUpdateMs;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {
+this.unit = unit;
+this.credits = 0;
+this.lastUpdateMs = 0;
+}
+
+@Override
+public double measure(final MetricConfig config, final long timeMs) {
+if (config.quota() == null)
+return Long.MAX_VALUE;
+final double quota = config.quota().bound();
+final double burst = (config.samples() - 1) * 
convert(config.timeWindowMs()) * quota;
+refill(quota, burst, timeMs);
+return this.credits;
+}
+
+@Override
+public void record(final MetricConfig config, final double value, final 
long timeMs) {
+if (config.quota() == null)
+return;
+final double quota = config.quota().bound();
+final double burst = (config.samples() - 1) * 
convert(config.timeWindowMs()) * quota;

Review comment:
   Should burst be computed from `#samples` or `#samples - 1` ?

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+private final TimeUnit unit;
+private double credits;
+private long lastUpdateMs;
+
+public TokenBucket() {
+this(TimeUnit.SECONDS);
+}
+
+public TokenBucket(TimeUnit unit) {
+this.unit = unit;
+this.credits = 0;

Review comment:
   Should we start with 0 credit or the full burst credits? The benefit of 
the latter is that during initialization, the requests won't be throttled as 
much due to a cold start.

##
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##
@@ -131,6 +133,16 @@ class PermissiveControllerMutationQuota(private val time: 
Time,
 
 object ControllerMutationQuotaManager {
   val QuotaControllerMutationDefault = Int.MaxValue.toDouble
+
+  def throttleTime(e: QuotaViolationException, timeMs: Long): Long = {
+e.metric().measurable() match {
+  case _: TokenBucket => Math.round(-e.value() * e.bound())

Review comment:
   Hmm, assuming bound is the per sec rate, it seems that the 
throttleTimeMs should be `-e.value() / e.bound * 1000`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@inf

[jira] [Created] (KAFKA-10346) Redirect CreatePartition to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10346:
---

 Summary: Redirect CreatePartition to the controller
 Key: KAFKA-10346
 URL: https://issues.apache.org/jira/browse/KAFKA-10346
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, the CreatePartition should be redirected to the 
active controller instead of relying on admin client discovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10036) Improve error message if user violates `Supplier` pattern

2020-08-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10036:
---

Assignee: Igor Soarez  (was: Alex Sun)

> Improve error message if user violates `Supplier` pattern
> -
>
> Key: KAFKA-10036
> URL: https://issues.apache.org/jira/browse/KAFKA-10036
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Igor Soarez
>Priority: Minor
>  Labels: newbie
>
> Using the Processor API, users need to pass in a `ProcessorSupplier` that 
> needs to return new `Processor` instance each time `get()` is called.
> Users violate this rule on a regular basis and return the same instance on 
> `get()`. This mistake leads to a (not very informative) 
> `NullPointerException` during runtime. (Cf: 
> [https://stackoverflow.com/questions/61790984/kafka-stream-forward-method-throwing-nullpointerexception-because-processornode/61978396)]
> We could improve the error message by checking if `currentNode()` returns 
> `null` 
> ([https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L183)]
>  and throw an informative error message for this case.
> Furthermore, we could do a "sanity" check within `KafkaStreams` constructor 
> before we start the process threads: we get all `Suppliers` for the 
> `Topology` and call `get()` two times on each supplier to compare if the 
> returned object references are different – if they are the same, we throw an 
> informative error message.
> We should improve the JavaDocs, too: 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java#L34]
>  (also for `Transformer` et al. – it seems to be too subtle what "new" means. 
> Similarly for 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/Topology.java]
>  and 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java]
>  (`process()`, `transform()` etc.)
> Furthermore, we should improve the docs: to explain the supplier pattern 
> explicitly: 
> [https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10345) Redirect AlterPartitionReassignment to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10345:
---

 Summary: Redirect AlterPartitionReassignment to the controller 
 Key: KAFKA-10345
 URL: https://issues.apache.org/jira/browse/KAFKA-10345
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, the AlterPartitionReassignment should be 
redirected to the active controller instead of relying on admin client 
discovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10036) Improve error message if user violates `Supplier` pattern

2020-08-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17170493#comment-17170493
 ] 

Matthias J. Sax commented on KAFKA-10036:
-

[~soarez] – in general, before you take over an abandoned ticket, please make 
sure to reassign the ticket first (best to ask the current assignee if they are 
still working and only re-assign if there is no response for some time).

> Improve error message if user violates `Supplier` pattern
> -
>
> Key: KAFKA-10036
> URL: https://issues.apache.org/jira/browse/KAFKA-10036
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Igor Soarez
>Priority: Minor
>  Labels: newbie
>
> Using the Processor API, users need to pass in a `ProcessorSupplier` that 
> needs to return new `Processor` instance each time `get()` is called.
> Users violate this rule on a regular basis and return the same instance on 
> `get()`. This mistake leads to a (not very informative) 
> `NullPointerException` during runtime. (Cf: 
> [https://stackoverflow.com/questions/61790984/kafka-stream-forward-method-throwing-nullpointerexception-because-processornode/61978396)]
> We could improve the error message by checking if `currentNode()` returns 
> `null` 
> ([https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L183)]
>  and throw an informative error message for this case.
> Furthermore, we could do a "sanity" check within `KafkaStreams` constructor 
> before we start the process threads: we get all `Suppliers` for the 
> `Topology` and call `get()` two times on each supplier to compare if the 
> returned object references are different – if they are the same, we throw an 
> informative error message.
> We should improve the JavaDocs, too: 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java#L34]
>  (also for `Transformer` et al. – it seems to be too subtle what "new" means. 
> Similarly for 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/Topology.java]
>  and 
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java]
>  (`process()`, `transform()` etc.)
> Furthermore, we should improve the docs: to explain the supplier pattern 
> explicitly: 
> [https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10347) Redirect Create/DeleteTopics to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10347:
---

 Summary: Redirect Create/DeleteTopics to the controller
 Key: KAFKA-10347
 URL: https://issues.apache.org/jira/browse/KAFKA-10347
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, the Create/DeleteTopics should be redirected to 
the active controller instead of relying on admin client discovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10348) Redirect UpdateFeatures to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10348:
---

 Summary: Redirect UpdateFeatures to the controller
 Key: KAFKA-10348
 URL: https://issues.apache.org/jira/browse/KAFKA-10348
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, the UpdateFeatures should be redirected to the 
active controller instead of relying on admin client discovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10350) Add redirect request monitoring metrics

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10350:
---

 Summary: Add redirect request monitoring metrics
 Key: KAFKA-10350
 URL: https://issues.apache.org/jira/browse/KAFKA-10350
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


We need to add the metric for monitoring redirection progress as stated in the 
KIP:

MBean:kafka.server:type=RequestMetrics,name=NumRequestsForwardingToControllerPerSec,clientId=([-.\w]+)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10349) Deprecate client side controller access

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10349:
---

 Summary: Deprecate client side controller access
 Key: KAFKA-10349
 URL: https://issues.apache.org/jira/browse/KAFKA-10349
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


As stated in KIP-590, we would disallow new admin client to discover the 
controller location for encapsulation. For older broker communication, the 
metadata response will still contain the controller location.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-08-03 Thread GitBox


mjsax commented on a change in pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#discussion_r464749727



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamUtil.java
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+
+/**
+ * Shared functions to handle verifications of a valid {@link 
org.apache.kafka.streams.kstream.KStream}.
+ */
+final class KStreamUtil {
+
+private KStreamUtil() {}
+
+/**
+ * @throws IllegalArgumentException if the same transformer instance is 
obtained each time
+ */
+static void checkTransformerSupplier(final TransformerSupplier 
supplier) {
+if (supplier.get() == supplier.get()) {
+throw new IllegalArgumentException("TransformerSupplier generates 
single transformer reference. Supplier " +
+"pattern violated.");

Review comment:
   I think we can be more elaborate and add something like 
`TransformerSupplier#get() must return a new Transformer object each time it is 
called.`

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
##
@@ -175,6 +177,12 @@ public void testAddProcessorWithNullParents() {
 builder.addProcessor("processor", new MockProcessorSupplier<>(), 
(String) null);
 }
 
+@Test(expected = TopologyException.class)
+public void testAddProcessorWithBadSupplier() {

Review comment:
   We should also test `addGlobalStateStore()`

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -521,6 +521,10 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder builder)
  * 
  * It is not required to connect a global store to {@link Processor 
Processors}, {@link Transformer Transformers},
  * or {@link ValueTransformer ValueTransformer}; those have read-only 
access to all global stores by default.
+ * 
+ * The supplier should always generate a new instance each time invoking 
{@link  ProcessorSupplier#get()}. Creating

Review comment:
   nit `each time {@link  ProcessorSupplier#get()} is called.` (similar 
elsewhere -- please fix throughout the whole PR)

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -521,6 +521,10 @@ public synchronized StreamsBuilder addStateStore(final 
StoreBuilder builder)
  * 
  * It is not required to connect a global store to {@link Processor 
Processors}, {@link Transformer Transformers},
  * or {@link ValueTransformer ValueTransformer}; those have read-only 
access to all global stores by default.
+ * 
+ * The supplier should always generate a new instance each time invoking 
{@link  ProcessorSupplier#get()}. Creating
+ * a single Processor object and returning the same object reference in 
{@link ProcessorSupplier#get()} would be

Review comment:
   `{@link Processor}`(similar elsewhere -- please fix throughout the whole 
PR)

##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##
@@ -2522,8 +2522,12 @@ void to(final TopicNameExtractor topicExtractor,
  * If in {@link Transformer#transform(Object, Object) 
Transformer#transform()} multiple records need to be emitted
  * for each input record, it is recommended to use {@link 
#flatTransform(TransformerSupplier, String...)
  * flatTransform()}.
+ * The supplier should always generate a new instance each time invoking 
{@link TransformerSupplier#get()}. Creating
+ * a single Transformer object and returning the same object reference in 
{@link TransformerSupplier#get()} would be

Review comment:
   `{@link Transformer}` (also elsewhere)

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
##
@@ -1865,6 +1865,17 @@ public void 
shouldNotAllowNullNamedOnTransformWithStoreName() {
 assertThat(exception.getMessage(), equalTo("named can't be null"));
 }
 
+@Test
+publi

[GitHub] [kafka] abbccdda commented on pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-08-03 Thread GitBox


abbccdda commented on pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#issuecomment-668324185


   Also, do we have new unit test coverage for the changes?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-03 Thread GitBox


mjsax commented on pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#issuecomment-668325203


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-08-03 Thread GitBox


mjsax commented on a change in pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#discussion_r464752987



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -50,6 +50,14 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * 
+ * It is a requirement that the processing logic ({@link Topology}) be defined 
in a deterministic way,
+ * as in, the order in which all operators are added must be predictable and 
the same across all application
+ * instances.
+ * Topologies are only identical if all operators are added in the same order.
+ * If different KafkaStream instances of the same application build different 
topologies the result may be

Review comment:
   {@link KafkaStreams}

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -50,6 +50,14 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * 
+ * It is a requirement that the processing logic ({@link Topology}) be defined 
in a deterministic way,
+ * as in, the order in which all operators are added must be predictable and 
the same across all application
+ * instances.
+ * Topologies are only identical if all operators are added in the same order.
+ * If different KafkaStream instances of the same application build different 
topologies the result may be
+ * incompatible runtimes and unexpected results.

Review comment:
   `incompatible runtimes and unexpected results` -> `incompatible runtime 
code and unexpected results or errors.`

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1916,6 +1917,40 @@ public void shouldAlwaysSuspendRunningTasks() {
 assertThat(task.state(), equalTo(SUSPENDED));
 }
 
+@Test
+public void szTest() {
+final InternalProcessorContext context = new ProcessorContextImpl(
+taskId,
+createConfig(false, "100"),
+stateManager,
+streamsMetrics,
+null
+);
+final StreamsMetricsImpl metrics = new 
StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST);
+
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+EasyMock.replay(stateManager);
+
+// The processor topology is missing the topics
+final ProcessorTopology topology = withSources(asList(), mkMap());
+
+assertThrows(TopologyException.class, () ->

Review comment:
   nit: formatting: (we should also get the exception an verify the error 
message)
   ```
   final TopologyException  exception = assertThrows(
   TopologyException.class,
   () -> new StreamTask(
   ...
   )
   );
   
   assertThat(exception.getMessage(), equalTo("..."));
   ```
   
   

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1916,6 +1917,40 @@ public void shouldAlwaysSuspendRunningTasks() {
 assertThat(task.state(), equalTo(SUSPENDED));
 }
 
+@Test
+public void szTest() {

Review comment:
   `szTest` is a terrible test name: 
`shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax closed pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

2020-08-03 Thread GitBox


mjsax closed pull request #8752:
URL: https://github.com/apache/kafka/pull/8752


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

2020-08-03 Thread GitBox


mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-668326105


   Closing this PR in favor of #9000 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9751) Internal topic creation should go to controller

2020-08-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9751:
---
Summary: Internal topic creation should go to controller  (was: Admin 
`FindCoordinator` call should go to controller instead of ZK)

> Internal topic creation should go to controller
> ---
>
> Key: KAFKA-9751
> URL: https://issues.apache.org/jira/browse/KAFKA-9751
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> In current trunk, we are still going to use ZK for topic creation in the 
> routing of FindCoordinatorRequest:
>  val (partition, topicMetadata) = 
> CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
>         case CoordinatorType.GROUP =>
>           val partition = 
> groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
>           val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
> request.context.listenerName)
>           (partition, metadata)
> Which should be migrated to controller handling



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9751) Internal topic creation should go to controller

2020-08-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9751:
---
Description: For use cases to create internal topics through 
FindCoordinator or Metadata request, receiving broker should route the topic 
creation request to the controller instead of handling by itself.  (was: In 
current trunk, we are still going to use ZK for topic creation in the routing 
of FindCoordinatorRequest:
 val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
        case CoordinatorType.GROUP =>
          val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
          (partition, metadata)
Which should be migrated to controller handling)

> Internal topic creation should go to controller
> ---
>
> Key: KAFKA-9751
> URL: https://issues.apache.org/jira/browse/KAFKA-9751
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> For use cases to create internal topics through FindCoordinator or Metadata 
> request, receiving broker should route the topic creation request to the 
> controller instead of handling by itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-08-03 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r464756646



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -274,30 +252,74 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = retryUntilSuccessOrThrowOnTaskTimeout(
+() -> globalConsumer.position(topicPartition),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
-while (offset < highWatermark) {
-final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+while (offset < highWatermark) { // when we "fix" this loop 
(KAFKA-7380 / KAFKA-10317)
+ // we should update the `poll()` 
timeout below
+
+// we ignore `poll.ms` config during bootstrapping phase and
+// apply `request.timeout.ms` plus `task.timeout.ms` instead
+//
+// the reason is, that `poll.ms` might be too short to give a 
fetch request a fair chance
+// to actually complete and we don't want to start 
`task.timeout.ms` too early
+//
+// we also pass `task.timeout.ms` into `poll()` directly right 
now as it simplifies our own code:
+// if we don't pass it in, we would just track the timeout 
ourselves and call `poll()` again
+// in our own retry loop; by passing the timeout we can reuse 
the consumer's internal retry loop instead
+//
+// note that using `request.timeout.ms` provides a 
conservative upper bound for the timeout;
+// this implies that we might start `task.timeout.ms` 
"delayed" -- however, starting the timeout
+// delayed is preferable (as it's more robust) than starting 
it too early
+//
+// TODO https://issues.apache.org/jira/browse/KAFKA-10315
+//   -> do a more precise timeout handling if `poll` would 
throw an exception if a fetch request fails
+//  (instead of letting the consumer retry fetch requests 
silently)
+//
+// TODO https://issues.apache.org/jira/browse/KAFKA-10317 and
+//  https://issues.apache.org/jira/browse/KAFKA-7380
+//  -> don't pass in `task.timeout.ms` to stay responsive if 
`KafkaStreams#close` gets called
+final ConsumerRecords records = 
globalConsumer.poll(requestTimeoutPlusTaskTimeout);
+if (records.isEmpty()) {
+// this will always throw
+maybeUpdateDeadlineOrThrow(time.milliseconds());

Review comment:
   We could, but this implies redundant code to "assemble" the error 
message, and I prefer to reuse the existing code for it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10351) Add missing tests for IOExceptions for GlobalStateManagerImpl

2020-08-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10351:

Issue Type: Test  (was: Improvement)

> Add missing tests for IOExceptions for GlobalStateManagerImpl
> -
>
> Key: KAFKA-10351
> URL: https://issues.apache.org/jira/browse/KAFKA-10351
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, newbie
>
> As mentioned in 
> [https://github.com/apache/kafka/pull/9047#pullrequestreview-456899096] we 
> should improve test coverage for IOExceptions, ie, we don't die and verify 
> the WARN log (using `LogCaptureAppender`)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10351) Add missing tests for IOExceptions for GlobalStateManagerImpl

2020-08-03 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-10351:
---

 Summary: Add missing tests for IOExceptions for 
GlobalStateManagerImpl
 Key: KAFKA-10351
 URL: https://issues.apache.org/jira/browse/KAFKA-10351
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Matthias J. Sax


As mentioned in 
[https://github.com/apache/kafka/pull/9047#pullrequestreview-456899096] we 
should improve test coverage for IOExceptions, ie, we don't die and verify the 
WARN log (using `LogCaptureAppender`)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10342) Redirect CreateAcls/DeleteAcls to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10342:
---

 Summary: Redirect CreateAcls/DeleteAcls to the controller
 Key: KAFKA-10342
 URL: https://issues.apache.org/jira/browse/KAFKA-10342
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the new Admin client, the CreateAcls/DeleteAcls request should be redirected 
to the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10343) Redirect AlterClientQuotas to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10343:
---

 Summary: Redirect AlterClientQuotas to the controller
 Key: KAFKA-10343
 URL: https://issues.apache.org/jira/browse/KAFKA-10343
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the new Admin client, the AlterClientQuotas request should be redirected to 
the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-08-03 Thread GitBox


mjsax commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r464759130



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -71,21 +73,20 @@
 private final String topic2 = "test_topic_2";
 private final String topic3 = "test_topic_3";
 private final List singleReplica = 
Collections.singletonList(broker1);
-private final int numRetries = 1;
 
 private String threadName;
 
 private MockAdminClient mockAdminClient;
 private InternalTopicManager internalTopicManager;
 
-@SuppressWarnings("deprecation") // TODO revisit in follow up PR
 private final Map config = new HashMap() {
 {
 put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker1.host() + ":" + 
broker1.port());
 put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
 
put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 16384);
-put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 
numRetries);
+
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), 
100);

Review comment:
   In this PR, we change the "deadline" in the group leader to 
create/verify all internal topics by `max.poll.interval.ms / 2` and we reduce 
the default of 5 minutes to speed up this test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10344) Redirect Create/Renew/ExpireDelegationToken to the controller

2020-08-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10344:
---

 Summary: Redirect Create/Renew/ExpireDelegationToken to the 
controller
 Key: KAFKA-10344
 URL: https://issues.apache.org/jira/browse/KAFKA-10344
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


In the bridge release broker, Create/Renew/ExpireDelegationToken should be 
redirected to the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10342) Redirect Create/DeleteAcls to the controller

2020-08-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10342:

Summary: Redirect Create/DeleteAcls to the controller  (was: Redirect 
CreateAcls/DeleteAcls to the controller)

> Redirect Create/DeleteAcls to the controller
> 
>
> Key: KAFKA-10342
> URL: https://issues.apache.org/jira/browse/KAFKA-10342
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> In the new Admin client, the CreateAcls/DeleteAcls request should be 
> redirected to the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10342) Redirect Create/DeleteAcls to the controller

2020-08-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10342:

Description: 
In the new Admin client, the CreateAcls/DeleteAcls request should be redirected 
to the active controller.
In the bridge release broker, Create/Renew/ExpireDelegationToken should be 
redirected to the active controller.

  was:In the new Admin client, the CreateAcls/DeleteAcls request should be 
redirected to the active controller.


> Redirect Create/DeleteAcls to the controller
> 
>
> Key: KAFKA-10342
> URL: https://issues.apache.org/jira/browse/KAFKA-10342
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> In the new Admin client, the CreateAcls/DeleteAcls request should be 
> redirected to the active controller.
> In the bridge release broker, Create/Renew/ExpireDelegationToken should be 
> redirected to the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-08-03 Thread GitBox


mjsax commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r464759130



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -71,21 +73,20 @@
 private final String topic2 = "test_topic_2";
 private final String topic3 = "test_topic_3";
 private final List singleReplica = 
Collections.singletonList(broker1);
-private final int numRetries = 1;
 
 private String threadName;
 
 private MockAdminClient mockAdminClient;
 private InternalTopicManager internalTopicManager;
 
-@SuppressWarnings("deprecation") // TODO revisit in follow up PR
 private final Map config = new HashMap() {
 {
 put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker1.host() + ":" + 
broker1.port());
 put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
 
put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), 16384);
-put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 
numRetries);
+
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), 
100);

Review comment:
   In this PR, we change the "deadline" in the group leader to 
create/verify all internal topics by `max.poll.interval.ms / 2` and we reduce 
the default of 5 minutes to speed up this test.
   
   cf 
https://github.com/apache/kafka/pull/9060/files#diff-d3963e433c59b08688bb4481faa20e97R79





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10181) Redirect AlterConfig/IncrementalAlterConfig to the controller

2020-08-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10181:

Description: In the bridge release broker, 
AlterConfig/IncrementalAlterConfig should be redirected to the active 
controller.  (was: In the new Admin client, the 
AlterConfig/IncrementalAlterConfig request should be redirected to the active 
controller.)

> Redirect AlterConfig/IncrementalAlterConfig to the controller
> -
>
> Key: KAFKA-10181
> URL: https://issues.apache.org/jira/browse/KAFKA-10181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>
> In the bridge release broker, AlterConfig/IncrementalAlterConfig should be 
> redirected to the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10343) Redirect AlterClientQuotas to the controller

2020-08-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10343:

Description: In the bridge release broker, AlterClientQuotas should be 
redirected to the active controller.  (was: In the new Admin client, the 
AlterClientQuotas request should be redirected to the active controller.)

> Redirect AlterClientQuotas to the controller
> 
>
> Key: KAFKA-10343
> URL: https://issues.apache.org/jira/browse/KAFKA-10343
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> In the bridge release broker, AlterClientQuotas should be redirected to the 
> active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10342) Redirect Create/DeleteAcls to the controller

2020-08-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10342:

Description: In the bridge release broker,Create/DeleteAcls should be 
redirected to the active controller.  (was: In the new Admin client, the 
CreateAcls/DeleteAcls request should be redirected to the active controller.
In the bridge release broker, Create/Renew/ExpireDelegationToken should be 
redirected to the active controller.)

> Redirect Create/DeleteAcls to the controller
> 
>
> Key: KAFKA-10342
> URL: https://issues.apache.org/jira/browse/KAFKA-10342
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> In the bridge release broker,Create/DeleteAcls should be redirected to the 
> active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9705) Zookeeper mutation protocols should be redirected to Controller only

2020-08-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9705:
---
Fix Version/s: 2.7.0

> Zookeeper mutation protocols should be redirected to Controller only
> 
>
> Key: KAFKA-9705
> URL: https://issues.apache.org/jira/browse/KAFKA-9705
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>
> In the bridge release, we need to restrict the direct access of ZK to 
> controller only. This means the existing AlterConfig path should be migrated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-08-03 Thread GitBox


mjsax commented on pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#issuecomment-668338125


   > btw, it would be good to add test for IOException cases, such as
   
   Seems out of scope for this PR. We should rather to a new PR. I created 
https://issues.apache.org/jira/browse/KAFKA-10351



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-08-03 Thread GitBox


junrao commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r464735038



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -652,6 +653,12 @@ class Log(@volatile private var _dir: File,
 bytesTruncated
   }
 
+  def segmentRecovery(): LogSegment => Int = {
+(segment: LogSegment) => {
+  recoverSegment(segment, None)

Review comment:
   You can see how the recovery point is set in Log.flush(). To fail a 
broker, we can probably do kafka.utils.Exit.exit(e.statusCode).
   
   It would be useful if you could preserve the commit history for easier 
incremental reviews.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >