[GitHub] [kafka] guozhangwang commented on pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-05 Thread GitBox


guozhangwang commented on pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#issuecomment-791891198


   We may reset generationa and request rejoin in two different places: 1) in 
join/sync-group handler, and 2) in joinGroupIfNeeded, when the future is 
received. The principle is that these two should not overlap, and 2) is used as 
a fallback for those common errors from join/sync that we do not handle 
specifically.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-05 Thread GitBox


guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r588847540



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -802,8 +801,10 @@ public void handle(SyncGroupResponse syncResponse,
 log.info("SyncGroup failed: {} Marking coordinator 
unknown. Sent generation was {}",
  error.message(), sentGeneration);
 markCoordinatorUnknown(error);
+requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error);

Review comment:
   You're right, we do not, I've updated this section.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-03-05 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-12381:

Fix Version/s: 2.8.0

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>  Labels: kip-500
> Fix For: 2.8.0
>
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12381) Incompatible change in verifiable_producer.log in 2.8

2021-03-05 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-12381.
-
Resolution: Fixed

> Incompatible change in verifiable_producer.log in 2.8
> -
>
> Key: KAFKA-12381
> URL: https://issues.apache.org/jira/browse/KAFKA-12381
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Boyang Chen
>Priority: Blocker
>  Labels: kip-500
>
> In test_verifiable_producer.py , we used to see this error message in 
> verifiable_producer.log when a topic couldn't be created:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=LEADER_NOT_AVAILABLE} 
> (org.apache.kafka.clients.NetworkClient)
> The test does a grep LEADER_NOT_AVAILABLE on the log in this case, and it 
> used to pass.
> Now we are instead seeing this in the log file:
> WARN [Producer clientId=producer-1] Error while fetching metadata with 
> correlation id 1 : {test_topic=INVALID_REPLICATION_FACTOR} 
> (org.apache.kafka.clients.NetworkClient)
> And of course now the test fails.
> The INVALID_REPLICATION_FACTOR is coming from the new auto topic creation 
> manager.
> It is a simple matter to make the test pass -- I have confirmed that it 
> passes if we grep for INVALID_REPLICATION_FACTOR in the log file instead of 
> LEADER_NOT_AVAILABLE.
> I think we just need to decide if this change in behavior is acceptable or 
> not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-05 Thread GitBox


guozhangwang commented on a change in pull request #10232:
URL: https://github.com/apache/kafka/pull/10232#discussion_r588842618



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse,
 }
 }
 } else {
-requestRejoin();

Review comment:
   I meant the latter: we call that inside the conditions already -- for 
those fatal errors, we do not need to call this anyways since the consumer will 
throw and crash.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #10273: MINOR: Include number of members in group coordinator messages

2021-03-05 Thread GitBox


ijuma opened a new pull request #10273:
URL: https://github.com/apache/kafka/pull/10273


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #10240: KAFKA-12381: remove live broker checks for forwarding topic creation

2021-03-05 Thread GitBox


abbccdda commented on pull request #10240:
URL: https://github.com/apache/kafka/pull/10240#issuecomment-791799082


   Cherry-picked to 2.8



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #10240: KAFKA-12381: remove live broker checks for forwarding topic creation

2021-03-05 Thread GitBox


abbccdda merged pull request #10240:
URL: https://github.com/apache/kafka/pull/10240


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #10240: KAFKA-12381: remove live broker checks for forwarding topic creation

2021-03-05 Thread GitBox


abbccdda commented on pull request #10240:
URL: https://github.com/apache/kafka/pull/10240#issuecomment-791798044


   System test pass: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4419/ and 
only flaky tests are failing, merging



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-12347) Improve Kafka Streams ability to track progress

2021-03-05 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson closed KAFKA-12347.
--

> Improve Kafka Streams ability to track progress
> ---
>
> Key: KAFKA-12347
> URL: https://issues.apache.org/jira/browse/KAFKA-12347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: kip
> Fix For: 3.0.0
>
>
> Add methods to track records being consumed fully and to tell if tasks are 
> idling. This will allow users of streams to build uptime metrics around 
> streams with less difficulty.
> KIP-715: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9349) Investigate new member timeout for dynamic v4 JoinGroup

2021-03-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-9349:
--

Assignee: (was: Jason Gustafson)

> Investigate new member timeout for dynamic v4 JoinGroup
> ---
>
> Key: KAFKA-9349
> URL: https://issues.apache.org/jira/browse/KAFKA-9349
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> While working on KAFKA-9232 I found the timeout behavior to be a bit odd for 
> dynamic members following the v4+ JoinGroup protocol (ie requireMemberId = 
> true). I haven't confirmed that there is a bug, but we couldn't confirm that 
> there definitely isn't so I'm opening this ticket for further investigation:
> During a dynamic join group, the first Join (with {{UNKNOWN_MEMBER_ID}} ) 
> schedules a heartbeat based on session timeout. During the second Join, when 
> memberId is now known, we schedule the heartbeat based on the 
> {{NewMemberTimeout}} — but right above that is a comment that says
> The session timeout does not affect new members since they do not have their 
> memberId and cannot send heartbeats.
> which seems to imply we shouldn’t be scheduling this sessionTimeout-based 
> heartbeat during the first Join. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12435) Several streams-test-utils classes missing from javadoc

2021-03-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296367#comment-17296367
 ] 

Ismael Juma commented on KAFKA-12435:
-

cc [~mjsax] [~vvcephei]

> Several streams-test-utils classes missing from javadoc
> ---
>
> Key: KAFKA-12435
> URL: https://issues.apache.org/jira/browse/KAFKA-12435
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Priority: Major
> Attachments: image-2021-03-05-14-22-45-891.png
>
>
> !image-2021-03-05-14-22-45-891.png!
> Only 3 of them show up currently ^. Source: 
> https://kafka.apache.org/27/javadoc/index.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12435) Several streams-test-utils classes missing from javadoc

2021-03-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296366#comment-17296366
 ] 

Ismael Juma commented on KAFKA-12435:
-

If you run `aggregatedJavadoc` with Java 8, you will see some warnings that are 
likely related. With Java 11, the same warnings are transformed into errors:
{quote}/home/ijuma/src/kafka/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java:44:
 error: cannot find symbol
import org.apache.kafka.streams.internals.KeyValueStoreFacade;
 ^
 symbol: class KeyValueStoreFacade
 location: package org.apache.kafka.streams.internals
/home/ijuma/src/kafka/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java:45:
 error: cannot find symbol
import org.apache.kafka.streams.internals.WindowStoreFacade;
 ^
 symbol: class WindowStoreFacade
 location: package org.apache.kafka.streams.internals
/home/ijuma/src/kafka/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java:69:
 error: cannot find symbol
import org.apache.kafka.streams.processor.internals.TestDriverProducer;
 ^
 symbol: class TestDriverProducer
 location: package org.apache.kafka.streams.processor.internals
/home/ijuma/src/kafka/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java:226:
 error: cannot find symbol
 private final TestDriverProducer testDriverProducer;
 ^
 symbol: class TestDriverProducer
 location: class TopologyTestDriver
4 errors
{quote}

> Several streams-test-utils classes missing from javadoc
> ---
>
> Key: KAFKA-12435
> URL: https://issues.apache.org/jira/browse/KAFKA-12435
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Priority: Major
> Attachments: image-2021-03-05-14-22-45-891.png
>
>
> !image-2021-03-05-14-22-45-891.png!
> Only 3 of them show up currently ^. Source: 
> https://kafka.apache.org/27/javadoc/index.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12435) Several streams-test-utils classes missing from javadoc

2021-03-05 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-12435:
---

 Summary: Several streams-test-utils classes missing from javadoc
 Key: KAFKA-12435
 URL: https://issues.apache.org/jira/browse/KAFKA-12435
 Project: Kafka
  Issue Type: Bug
Reporter: Ismael Juma
 Attachments: image-2021-03-05-14-22-45-891.png

!image-2021-03-05-14-22-45-891.png!

Only 3 of them show up currently ^.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12435) Several streams-test-utils classes missing from javadoc

2021-03-05 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12435:

Description: 
!image-2021-03-05-14-22-45-891.png!

Only 3 of them show up currently ^. Source: 
https://kafka.apache.org/27/javadoc/index.html

  was:
!image-2021-03-05-14-22-45-891.png!

Only 3 of them show up currently ^.


> Several streams-test-utils classes missing from javadoc
> ---
>
> Key: KAFKA-12435
> URL: https://issues.apache.org/jira/browse/KAFKA-12435
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Priority: Major
> Attachments: image-2021-03-05-14-22-45-891.png
>
>
> !image-2021-03-05-14-22-45-891.png!
> Only 3 of them show up currently ^. Source: 
> https://kafka.apache.org/27/javadoc/index.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #10272: MINOR: Various javadoc fixes

2021-03-05 Thread GitBox


ijuma opened a new pull request #10272:
URL: https://github.com/apache/kafka/pull/10272


   - Use consistent options for `javadoc` and `aggregatedJavadoc`
   - `aggregatedJavadoc` depends on `compileJava`
   - `connect-api` inherits `options.links`
   - `streams` and `streams-test-utils` javadoc exclusions should be more
   specific to avoid unexpected behavior in `aggregatedJavadoc` where the
   javadoc for multiple modules is generated together
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12434) Admin API for DescribeProducers

2021-03-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12434:
---

 Summary: Admin API for DescribeProducers
 Key: KAFKA-12434
 URL: https://issues.apache.org/jira/browse/KAFKA-12434
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Implement the Admin `describeProducers` API defined by KIP-664. The server 
implementation was completed in KAFKA-12238.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12432) Fix AdminClient timeout handling in the presence of badly behaved brokers

2021-03-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12432:
-
Summary: Fix AdminClient timeout handling in the presence of badly behaved 
brokers  (was: AdminClient will not honor connection setup or request timeouts 
in some cases)

> Fix AdminClient timeout handling in the presence of badly behaved brokers
> -
>
> Key: KAFKA-12432
> URL: https://issues.apache.org/jira/browse/KAFKA-12432
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> If NetworkClient allows us to create a connection to a node, but we can't 
> send a single request, AdminClient will hang forever (or until the operation 
> times out, at least) rather than retrying a different node.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda merged pull request #10211: KAFKA-12347: updating TaskMetadata

2021-03-05 Thread GitBox


abbccdda merged pull request #10211:
URL: https://github.com/apache/kafka/pull/10211


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working

2021-03-05 Thread GitBox


hachikuji commented on a change in pull request #10220:
URL: https://github.com/apache/kafka/pull/10220#discussion_r588522951



##
File path: core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
##
@@ -199,12 +209,25 @@ private void processClusterTest(ClusterTest annot, 
ClusterTestDefaults defaults,
 }
 
 switch (type) {
-case ZK:
-case BOTH:
+case ZK: {
 ClusterConfig config = builder.build();
 config.serverProperties().putAll(properties);
 testInvocations.accept(new ZkClusterInvocationContext(config));
 break;
+} case RAFT: {
+ClusterConfig config = builder.build();
+config.serverProperties().putAll(properties);
+testInvocations.accept(new 
RaftClusterInvocationContext(config));
+break;
+} case BOTH: {
+ClusterConfig zkConfig = builder.build();

Review comment:
   nit: not a big deal, but I suspect there's a way to remove some of the 
duplication here in this switch. For example, perhaps we could add a method to 
`Type` to build the invocation context:
   ```java
 List invocationContexts(ClusterConfig config);
   ```

##
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.network.SocketServer;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.metadata.BrokerState;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test 
invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ * 
+ * ClusterConfig (the same instance passed to the constructor)
+ * ClusterInstance (includes methods to expose underlying 
SocketServer-s)
+ * IntegrationTestHelper (helper methods)
+ * 
+ */
+public class RaftClusterInvocationContext implements 
TestTemplateInvocationContext {
+
+private final ClusterConfig clusterConfig;
+private final AtomicReference clusterReference;
+
+public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
+this.clusterConfig = clusterConfig;
+this.clusterReference = new AtomicReference<>();
+}
+
+@Override
+public String getDisplayName(int invocationIndex) {
+String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+.map(Object::toString)
+.collect(Collectors.joining(", "));
+return String.format("[Quorum %d] %s", invocationIndex, clusterDesc);
+}
+
+@Override
+public List getAdditionalExtensions() {
+return Arrays.asList(
+(BeforeTestExecutionCallback) context -> {
+KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder().
+setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+
setNumControllerNodes(clusterConfig.numControllers()).build());
+
+// Copy properties into the TestKit builder
+clusterConfig.serverProperties().forEach((key, value) -> 

[jira] [Created] (KAFKA-12433) Controller should create topic config records after topic record

2021-03-05 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12433:
---

 Summary: Controller should create topic config records after topic 
record
 Key: KAFKA-12433
 URL: https://issues.apache.org/jira/browse/KAFKA-12433
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


When creating a topic, the controller emits a topic record, config records, and 
partition records. These are all created in one atomic batch, so there is no 
concern about partial state. The current order that the records appear is:

1. ConfigRecord(s)
2. TopicRecord
3. PartitionRecord(s)

I think it would be more intuitive if the ConfigRecords are written after the 
TopicRecord. This makes validation simpler when reading the data since it is 
not necessary to read ahead to find the corresponding TopicRecord.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10252: KAFKA-12403; Ensure local state deleted on `RemoveTopicRecord` received

2021-03-05 Thread GitBox


hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r588524835



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##
@@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
   case e: Exception => throw new RuntimeException("Unknown metadata record 
type " +
   s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
 }
-recordType match {
-  case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
-record.asInstanceOf[RegisterBrokerRecord])
-  case UNREGISTER_BROKER_RECORD => 
handleUnregisterBrokerRecord(imageBuilder,
-record.asInstanceOf[UnregisterBrokerRecord])
-  case TOPIC_RECORD => handleTopicRecord(imageBuilder,
-record.asInstanceOf[TopicRecord])
-  case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
-record.asInstanceOf[PartitionRecord])
-  case CONFIG_RECORD => 
handleConfigRecord(record.asInstanceOf[ConfigRecord])
-  case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
-record.asInstanceOf[PartitionChangeRecord])
-  case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
-record.asInstanceOf[FenceBrokerRecord])
-  case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
-record.asInstanceOf[UnfenceBrokerRecord])
-  case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
-record.asInstanceOf[RemoveTopicRecord])
-  case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
-record.asInstanceOf[QuotaRecord])
-  // TODO: handle FEATURE_LEVEL_RECORD
-  case _ => throw new RuntimeException(s"Unsupported record type 
${recordType}")
+
+record match {
+  case rec: RegisterBrokerRecord => 
handleRegisterBrokerRecord(imageBuilder, rec)
+  case rec: UnregisterBrokerRecord => 
handleUnregisterBrokerRecord(imageBuilder, rec)
+  case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
+  case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, 
rec)
+  case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
+  case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)

Review comment:
   FYI: https://issues.apache.org/jira/browse/KAFKA-12433





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10240: KAFKA-12381: remove live broker checks for forwarding topic creation

2021-03-05 Thread GitBox


hachikuji commented on a change in pull request #10240:
URL: https://github.com/apache/kafka/pull/10240#discussion_r588513624



##
File path: 
core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
##
@@ -146,29 +146,118 @@ class AutoTopicCreationManagerTest {
   }
 
   @Test
-  def testNotEnoughLiveBrokers(): Unit = {
-val props = TestUtils.createBrokerConfig(1, "localhost")
-props.setProperty(KafkaConfig.DefaultReplicationFactorProp, 3.toString)
-config = KafkaConfig.fromProps(props)
+  def testInvalidReplicationFactorForNonInternalTopics(): Unit = {
+testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR, "topic", 
isInternal = false)
+  }
+
+  @Test
+  def testInvalidReplicationFactorForConsumerOffsetsTopic(): Unit = {
+Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
+  }
+
+  @Test
+  def testInvalidReplicationFactorForTxnOffsetTopic(): Unit = {
+
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+testErrorWithCreationInZk(Errors.INVALID_REPLICATION_FACTOR, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
+  }
+
+  @Test
+  def testTopicExistsErrorSwapForNonInternalTopics(): Unit = {
+testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS, "topic", isInternal 
= false,
+  expectedError = Errors.LEADER_NOT_AVAILABLE)
+  }
+
+  @Test
+  def testTopicExistsErrorSwapForConsumerOffsetsTopic(): Unit = {
+Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
+  expectedError = Errors.LEADER_NOT_AVAILABLE)
+  }
+
+  @Test
+  def testTopicExistsErrorSwapForTxnOffsetTopic(): Unit = {
+
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+testErrorWithCreationInZk(Errors.TOPIC_ALREADY_EXISTS, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true,
+  expectedError = Errors.LEADER_NOT_AVAILABLE)
+  }
+
+  @Test
+  def testRequestTimeoutErrorSwapForNonInternalTopics(): Unit = {
+testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT, "topic", isInternal = 
false,
+  expectedError = Errors.LEADER_NOT_AVAILABLE)
+  }
+
+  @Test
+  def testRequestTimeoutErrorSwapForConsumerOffsetTopic(): Unit = {
+Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true,
+  expectedError = Errors.LEADER_NOT_AVAILABLE)
+  }
+
+  @Test
+  def testRequestTimeoutErrorSwapForTxnOffsetTopic(): Unit = {
+
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+testErrorWithCreationInZk(Errors.REQUEST_TIMED_OUT, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true,
+  expectedError = Errors.LEADER_NOT_AVAILABLE)
+  }
+
+  @Test
+  def testUnknownTopicPartitionForNonIntervalTopic(): Unit = {
+testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, "topic", 
isInternal = false)
+  }
 
+  @Test
+  def testUnknownTopicPartitionForConsumerOffsetTopic(): Unit = {
+Mockito.when(groupCoordinator.offsetsTopicConfigs).thenReturn(new 
Properties)
+testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Topic.GROUP_METADATA_TOPIC_NAME, isInternal = true)
+  }
+
+  @Test
+  def testUnknownTopicPartitionForTxnOffsetTopic(): Unit = {
+
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new 
Properties)
+testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
+  }
+
+  private def testErrorWithCreationInZk(error: Errors,
+topicName: String,
+isInternal: Boolean,
+expectedError: Errors = null): Unit = {

Review comment:
   nit: a little more idiomatic to use `Option[Errors]` for a case like this





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12432) AdminClient will not honor connection setup or request timeouts in some cases

2021-03-05 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12432:


 Summary: AdminClient will not honor connection setup or request 
timeouts in some cases
 Key: KAFKA-12432
 URL: https://issues.apache.org/jira/browse/KAFKA-12432
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe


If NetworkClient allows us to create a connection to a node, but we can't send 
a single request, AdminClient will hang forever (or until the operation times 
out, at least) rather than retrying a different node.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd edited a comment on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-03-05 Thread GitBox


satishd edited a comment on pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#issuecomment-791545987


   @junrao: This is built on top of https://github.com/apache/kafka/pull/10218. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-03-05 Thread GitBox


satishd commented on pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#issuecomment-791545987


   @junrao 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd opened a new pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-03-05 Thread GitBox


satishd opened a new pull request #10271:
URL: https://github.com/apache/kafka/pull/10271


   KAFKA-12429:  Added serdes for the default implementation of RLMM based on 
an internal topic as storage. This topic will receive events of 
RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and 
RemotePartitionDeleteMetadata. These events are serialized into Kafka protocol 
message format.
   Added tests for all the event types for that topic.
   
   This is part of the tiered storaqe implementation 
[KIP-405](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-MessageFormat).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-05 Thread GitBox


ijuma commented on pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#issuecomment-791487213


   Thanks for the PR. Can you check the perf impact of these changes?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-12347) Improve Kafka Streams ability to track progress

2021-03-05 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson reopened KAFKA-12347:


> Improve Kafka Streams ability to track progress
> ---
>
> Key: KAFKA-12347
> URL: https://issues.apache.org/jira/browse/KAFKA-12347
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: kip
> Fix For: 3.0.0
>
>
> Add methods to track records being consumed fully and to tell if tasks are 
> idling. This will allow users of streams to build uptime metrics around 
> streams with less difficulty.
> KIP-715: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-715%3A+Expose+Committed+offset+in+streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2021-03-05 Thread GitBox


mimaison commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r588262888



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
##
@@ -18,11 +18,15 @@
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.transforms.Cast;
+import org.apache.kafka.connect.transforms.DropHeaders;
 import org.apache.kafka.connect.transforms.ExtractField;
 import org.apache.kafka.connect.transforms.Filter;
 import org.apache.kafka.connect.transforms.Flatten;
+import org.apache.kafka.connect.transforms.HeaderFrom;
+import org.apache.kafka.connect.transforms.HeaderTo;

Review comment:
   We don't have this transformation!

##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.List;
+import java.util.Map;
+
+public class DropHeaders> implements 
Transformation {
+
+public static final String OVERVIEW_DOC =
+"Removes one or more headers from each record.";
+
+public static final String HEADERS_FIELD = "headers";
+
+public static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(HEADERS_FIELD, ConfigDef.Type.LIST, 
ConfigDef.Importance.HIGH,
+"The name of the headers to be removed.");
+
+private List headers;
+
+@Override
+public R apply(R record) {
+Headers updatedHeaders = record.headers().duplicate();
+for (String name : headers) {
+updatedHeaders.remove(name);

Review comment:
   Because `Headers` is a `LinkedList`, `remove()` has to iterate the whole 
list each time. I wonder if we could instead start from an empty headers list 
and add the headers not being removed?

##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;

Review comment:
   Unused import

##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10267: KAFKA-12427: Don't update connection idle time for muted connections

2021-03-05 Thread GitBox


rajinisivaram commented on a change in pull request #10267:
URL: https://github.com/apache/kafka/pull/10267#discussion_r588308473



##
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##
@@ -517,7 +517,7 @@ void pollSelectionKeys(Set selectionKeys,
 
 // register all per-connection metrics at once
 sensors.maybeRegisterConnectionMetrics(nodeId);
-if (idleExpiryManager != null)
+if (idleExpiryManager != null && 
!explicitlyMutedChannels.contains(channel))

Review comment:
   This is limiting the times when we perform the update. For example when 
sending response, the channel is active, but explicitly muted. Don't think we 
want to skip update in this case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #10267: KAFKA-12427: Don't update connection idle time for muted connections

2021-03-05 Thread GitBox


rajinisivaram commented on pull request #10267:
URL: https://github.com/apache/kafka/pull/10267#issuecomment-791420001


   @splett2 Thanks for the PR. I think it would be better to store channels in 
`keysWithBufferedReads` only when the channel is not explicitly muted since we 
care about this state only when we are ready to read from the channel. So we 
could check if channel is explicitly muted when adding to 
`keysWithBufferedReads` and add the key to `keysWithBufferedReads` when 
unmuting. This would avoid processing the channel unnecessarily.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation

2021-03-05 Thread Ivan Yurchenko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Yurchenko reassigned KAFKA-12430:
--

Assignee: Ivan Yurchenko

> emit.heartbeats.enabled = false should disable heartbeats topic creation
> 
>
> Key: KAFKA-12430
> URL: https://issues.apache.org/jira/browse/KAFKA-12430
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Ivan Yurchenko
>Assignee: Ivan Yurchenko
>Priority: Minor
>
> Currently, MirrorMaker 2's {{MirrorHeartbeatConnector}} emits heartbeats or 
> not based on {{emit.heartbeats.enabled}} setting. However, {{heartbeats}} 
> topic is created unconditionally. It seems that the same setting should 
> really disable the topic creation as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy merged pull request #10270: MINOR: Update log level in SaslServerAuthenticator

2021-03-05 Thread GitBox


omkreddy merged pull request #10270:
URL: https://github.com/apache/kafka/pull/10270


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #10192: MINOR: Add missing unit tests for Mirror Connect

2021-03-05 Thread GitBox


mimaison commented on a change in pull request #10192:
URL: https://github.com/apache/kafka/pull/10192#discussion_r588238922



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
##
@@ -86,4 +96,66 @@ public void testZeroOffsetSync() {
 assertTrue(partitionState.update(4, 3));
 assertTrue(partitionState.update(5, 4));
 }
+
+@Test
+public void testPoll() {
+// Create a consumer mock
+byte[] key1 = "abc".getBytes();
+byte[] value1 = "fgh".getBytes();
+byte[] key2 = "123".getBytes();
+byte[] value2 = "456".getBytes();
+List> consumerRecordsList =  new 
ArrayList<>();
+String topicName = "test";
+String headerKey = "key";
+RecordHeaders headers = new RecordHeaders(new Header[] {
+new RecordHeader(headerKey, "value".getBytes()),
+});
+consumerRecordsList.add(new ConsumerRecord<>(topicName, 0, 0, 
System.currentTimeMillis(),
+TimestampType.CREATE_TIME, 0L, key1.length, value1.length, 
key1, value1, headers));
+consumerRecordsList.add(new ConsumerRecord<>(topicName, 1, 1, 
System.currentTimeMillis(),
+TimestampType.CREATE_TIME, 0L, key2.length, value2.length, 
key2, value2, headers));
+ConsumerRecords consumerRecords =
+new ConsumerRecords<>(Collections.singletonMap(new 
TopicPartition(topicName, 0), consumerRecordsList));
+
+@SuppressWarnings("unchecked")
+KafkaConsumer consumer = mock(KafkaConsumer.class);
+when(consumer.poll(any())).thenReturn(consumerRecords);
+
+MirrorMetrics metrics = mock(MirrorMetrics.class);
+
+String sourceClusterName = "cluster1";
+ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, 
metrics, sourceClusterName,
+replicationPolicy, 50);
+List sourceRecords = mirrorSourceTask.poll();
+
+assertEquals(2, sourceRecords.size());
+for (int i = 0; i < sourceRecords.size(); i++) {
+SourceRecord sourceRecord = sourceRecords.get(i);
+ConsumerRecord consumerRecord = consumerRecordsList.get(i);

Review comment:
   `ConsumerRecord consumerRecord`

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
##
@@ -86,4 +96,66 @@ public void testZeroOffsetSync() {
 assertTrue(partitionState.update(4, 3));
 assertTrue(partitionState.update(5, 4));
 }
+
+@Test
+public void testPoll() {
+// Create a consumer mock
+byte[] key1 = "abc".getBytes();
+byte[] value1 = "fgh".getBytes();
+byte[] key2 = "123".getBytes();
+byte[] value2 = "456".getBytes();
+List> consumerRecordsList =  new 
ArrayList<>();
+String topicName = "test";
+String headerKey = "key";
+RecordHeaders headers = new RecordHeaders(new Header[] {
+new RecordHeader(headerKey, "value".getBytes()),
+});
+consumerRecordsList.add(new ConsumerRecord<>(topicName, 0, 0, 
System.currentTimeMillis(),
+TimestampType.CREATE_TIME, 0L, key1.length, value1.length, 
key1, value1, headers));
+consumerRecordsList.add(new ConsumerRecord<>(topicName, 1, 1, 
System.currentTimeMillis(),
+TimestampType.CREATE_TIME, 0L, key2.length, value2.length, 
key2, value2, headers));
+ConsumerRecords consumerRecords =
+new ConsumerRecords<>(Collections.singletonMap(new 
TopicPartition(topicName, 0), consumerRecordsList));
+
+@SuppressWarnings("unchecked")
+KafkaConsumer consumer = mock(KafkaConsumer.class);
+when(consumer.poll(any())).thenReturn(consumerRecords);
+
+MirrorMetrics metrics = mock(MirrorMetrics.class);
+
+String sourceClusterName = "cluster1";
+ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, 
metrics, sourceClusterName,
+replicationPolicy, 50);
+List sourceRecords = mirrorSourceTask.poll();
+
+assertEquals(2, sourceRecords.size());
+for (int i = 0; i < sourceRecords.size(); i++) {
+SourceRecord sourceRecord = sourceRecords.get(i);
+ConsumerRecord consumerRecord = consumerRecordsList.get(i);
+assertEquals(consumerRecord.key(), sourceRecord.key());
+assertEquals(consumerRecord.value(), sourceRecord.value());
+// We expect that the topicname will be based on the replication 
policy currently used
+
assertEquals(replicationPolicy.formatRemoteTopic(sourceClusterName, topicName),
+sourceRecord.topic());
+// We expect that MirrorMaker will keep the same partition 
assignment
+

[GitHub] [kafka] pzygielo commented on pull request #8211: COMDEV-340 Fix project category

2021-03-05 Thread GitBox


pzygielo commented on pull request #8211:
URL: https://github.com/apache/kafka/pull/8211#issuecomment-791330792


   Now it has [separate issue 
(KAFKA-10775)](https://issues.apache.org/jira/browse/KAFKA-10775). What process 
shall be followed to have it corrected?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12431) Fetch Request/Response without Topic information

2021-03-05 Thread Peter Sinoros-Szabo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Sinoros-Szabo updated KAFKA-12431:

Attachment: fetch-on-2.6.1.png
fetch-on-2.4.1.png

> Fetch Request/Response without Topic information
> 
>
> Key: KAFKA-12431
> URL: https://issues.apache.org/jira/browse/KAFKA-12431
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.1
>Reporter: Peter Sinoros-Szabo
>Priority: Major
> Attachments: fetch-on-2.4.1.png, fetch-on-2.6.1.png
>
>
> I was running a 6 node Kafka 2.4.1 cluster with protocol and message format 
> version set to 2.4. I wanted to upgrade the cluster to 2.6.1 and after I 
> upgraded the 1st broker to 2.6.1 without any configuration change, I noticed 
> much higher CPU usage on that broker (instead of 25% CPU usage it was  ~350%) 
> and about 3-4x higher network traffic. So I dumped the traffic between the 
> Kafka client and broker and compared it with the traffic of the same broker 
> downgraded to 2.4.1.
> It seems to me that after I upgraded to 2.6.1, the Fetch requests and 
> responses are not complete, it is missing the topics part of the Fetch 
> Request, I don't know for what reason. I guess there should be always a 
> topics part.
> I'll attache a screenshot from these messages.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12431) Fetch Request/Response without Topic information

2021-03-05 Thread Peter Sinoros-Szabo (Jira)
Peter Sinoros-Szabo created KAFKA-12431:
---

 Summary: Fetch Request/Response without Topic information
 Key: KAFKA-12431
 URL: https://issues.apache.org/jira/browse/KAFKA-12431
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.6.1
Reporter: Peter Sinoros-Szabo


I was running a 6 node Kafka 2.4.1 cluster with protocol and message format 
version set to 2.4. I wanted to upgrade the cluster to 2.6.1 and after I 
upgraded the 1st broker to 2.6.1 without any configuration change, I noticed 
much higher CPU usage on that broker (instead of 25% CPU usage it was  ~350%) 
and about 3-4x higher network traffic. So I dumped the traffic between the 
Kafka client and broker and compared it with the traffic of the same broker 
downgraded to 2.4.1.

It seems to me that after I upgraded to 2.6.1, the Fetch requests and responses 
are not complete, it is missing the topics part of the Fetch Request, I don't 
know for what reason. I guess there should be always a topics part.

I'll attache a screenshot from these messages.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dpoldrugo edited a comment on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer

2021-03-05 Thread GitBox


dpoldrugo edited a comment on pull request #10059:
URL: https://github.com/apache/kafka/pull/10059#issuecomment-791291261


   @omkreddy well for as at Infobip it caused an incident which basically 
blocked all kafka traffic, just because of some clients didn't have PTR records 
and thus reverse dns blocked the networking thread pool of all kafka brokers. 
We are currently running a custom built kafka 2.5.0.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dpoldrugo commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer

2021-03-05 Thread GitBox


dpoldrugo commented on pull request #10059:
URL: https://github.com/apache/kafka/pull/10059#issuecomment-791291261


   @omkreddy well for as at Infobip it caused an incident which basically 
blocked all kafka traffic, just because of some clients didn't have PTR records 
and thus reverse dns blocked the networking thread pool of all kafka brokers.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy opened a new pull request #10270: MINOR: Update log level in SaslServerAuthenticator

2021-03-05 Thread GitBox


omkreddy opened a new pull request #10270:
URL: https://github.com/apache/kafka/pull/10270


   We are logging CONNECTIONS_MAX_REAUTH_MS at debug level for each mechanism. 
This is spammy when we enable debug logs for SaslServerAuthenticator. Updating 
the log level to trace level.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation

2021-03-05 Thread Ivan Yurchenko (Jira)
Ivan Yurchenko created KAFKA-12430:
--

 Summary: emit.heartbeats.enabled = false should disable heartbeats 
topic creation
 Key: KAFKA-12430
 URL: https://issues.apache.org/jira/browse/KAFKA-12430
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Reporter: Ivan Yurchenko


Currently, MirrorMaker 2's {{MirrorHeartbeatConnector}} emits heartbeats or not 
based on {{emit.heartbeats.enabled}} setting. However, {{heartbeats}} topic is 
created unconditionally. It seems that the same setting should really disable 
the topic creation as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-05 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588138822



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setRecords(data.records)
   
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
 data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-partitions.put(tp, partitionData)
+addPartition(tp.topic, partitionData)
   }
-  erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-  var unconvertedFetchResponse: FetchResponse = null
+  erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-  def createResponse(throttleTimeMs: Int): FetchResponse = {
+  def createResponse(unconvertedFetchResponse: FetchResponse, 
throttleTimeMs: Int): FetchResponse = {
 // Down-convert messages for each partition if required
-val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-unconvertedFetchResponse.responseData.forEach { (tp, 
unconvertedPartitionData) =>
-  val error = Errors.forCode(unconvertedPartitionData.errorCode)
-  if (error != Errors.NONE)
-debug(s"Fetch request with correlation id 
${request.header.correlationId} from client $clientId " +
-  s"on partition $tp failed due to ${error.exceptionName}")
-  convertedData.put(tp, maybeConvertFetchedData(tp, 
unconvertedPartitionData))
+val convertedResponse = new FetchResponseData()

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10526) Explore performance impact of leader fsync deferral

2021-03-05 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17295880#comment-17295880
 ] 

Sagar Rao commented on KAFKA-10526:
---

hey [~hachikuji] sorry for bugging again on this but could you plz help me out 
with the queries above? 

> Explore performance impact of leader fsync deferral
> ---
>
> Key: KAFKA-10526
> URL: https://issues.apache.org/jira/browse/KAFKA-10526
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Sagar Rao
>Priority: Major
>
> In order to commit a write, a majority of nodes must call fsync in order to 
> ensure the data has been written to disk. An interesting optimization option 
> to consider is letting the leader defer fsync until the high watermark is 
> ready to be advanced. This potentially allows us to reduce the number of 
> flushes on the leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Iskuskov commented on pull request #9541: KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message

2021-03-05 Thread GitBox


Iskuskov commented on pull request #9541:
URL: https://github.com/apache/kafka/pull/9541#issuecomment-791263135


   @kkonstantine, could you take a look at this please?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org