[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-04 Thread GitBox


kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570747258



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+private static final Map CONFIG = Collections.emptyMap();
+
+@Mock private TopicAdmin mockTopicAdmin;
+private SharedTopicAdmin sharedAdmin;
+private int created = 0;

Review comment:
   I know. It's just that we already use a mocking framework and we could 
use something like: 
   
   
`EasyMock.expect(factory.apply(EasyMock.anyObject())).andReturn(mockTopicAdmin).anyTimes();`
   
   if we also defined `factory` to be a mock as well. That could allow us to 
evaluate expectations on the mock more accurately (e.g. with a capture if we 
had to). But sure, if we need something quick and easy we can go with that. 
It's just that I noticed a mixed use of mocks with this variable that simulates 
what the mocking framework offers already.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-04 Thread GitBox


kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570728906



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically 
created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same 
shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link 
#close(Duration)}.
+ *
+ * The owner of this object is responsible for ensuring that either {@link 
#close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. 
Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and 
{@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * This class is thread-safe. It also appears as immutable to callers that 
obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier {
+
+// Visible for testing
+static final Duration DEFAULT_CLOSE_DURATION = 
Duration.ofMillis(Long.MAX_VALUE);
+
+private final Map adminProps;
+private final AtomicReference admin = new AtomicReference<>();
+private final AtomicBoolean closed = new AtomicBoolean(false);
+private final Function, TopicAdmin> factory;
+
+public SharedTopicAdmin(Map adminProps) {
+this(adminProps, TopicAdmin::new);
+}
+
+// Visible for testing
+SharedTopicAdmin(Map adminProps, Function, TopicAdmin> factory) {
+this.adminProps = Objects.requireNonNull(adminProps);
+this.factory = Objects.requireNonNull(factory);
+}
+
+/**
+ * Get the shared {@link TopicAdmin} instance.
+ *
+ * @return the shared instance; never null
+ * @throws ConnectException if this object has already been closed
+ */
+@Override
+public TopicAdmin get() {
+return topicAdmin();
+}
+
+/**
+ * Get the shared {@link TopicAdmin} instance.
+ *
+ * @return the shared instance; never null
+ * @throws ConnectException if this object has already been closed
+ */
+public TopicAdmin topicAdmin() {
+return admin.updateAndGet(this::createAdmin);

Review comment:
   I'm happy to leave it as an example of the pattern that demonstrates how 
to apply `updateAndGet`. 
   
   I just didn't feel that the two or three levels of indirection were worth to 
write the singleton pattern differently.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #10060: KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7

2021-02-04 Thread GitBox


wcarlson5 commented on pull request #10060:
URL: https://github.com/apache/kafka/pull/10060#issuecomment-773820050


   +1 from me. Thanks for the pr



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums

2021-02-04 Thread GitBox


hachikuji commented on a change in pull request #10045:
URL: https://github.com/apache/kafka/pull/10045#discussion_r570730096



##
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.network.RequestChannel
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.requests.AbstractResponse
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
+   * despite being optional when using ZooKeeper and required when using Raft
+   */
+  val forwardingManager: Option[ForwardingManager]
+
+  /**
+   * Return this instance downcast for use with ZooKeeper
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with ZooKeeper
+   * @throws Exception if this instance is not for ZooKeeper
+   */
+  def requireZkOrThrow(createException: => Exception): ZkSupport

Review comment:
   It's a little odd to see this in the trait. Would it be reasonable to 
turn them into private defs in `KafkaApis`? I'm ok with it if you think it is 
better. We'll only ever have the two implementations.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-04 Thread GitBox


kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570732565



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -185,16 +188,33 @@
 
 private final DistributedConfig config;
 
+/**
+ * Create a herder that will form a Connect cluster with other {@link 
DistributedHerder} instances (in this or other JVMs)
+ * that have the same group ID.
+ *
+ * @param config the configuration for the worker; may not be 
null
+ * @param time   the clock to use; may not be null
+ * @param worker the {@link Worker} instance to use; may not 
be null
+ * @param kafkaClusterId the identifier of the Kafka cluster to use 
for internal topics; may not be null
+ * @param statusBackingStore the backing store for statuses; may not be 
null
+ * @param configBackingStore the backing store for connector 
configurations; may not be null
+ * @param restUrlthe URL of this herder's REST API; may not be 
null
+ * @param connectorClientConfigOverridePolicy the policy specifying the 
client configuration properties that may be overridden
+ *in connector configurations; 
may not be null
+ * @param uponShutdown   any {@link AutoCloseable} objects that should 
be closed when this herder is {@link #stop() stopped},
+ *   after all services and resources owned by 
this herder are stopped
+ */
 public DistributedHerder(DistributedConfig config,
  Time time,
  Worker worker,
  String kafkaClusterId,
  StatusBackingStore statusBackingStore,
  ConfigBackingStore configBackingStore,
  String restUrl,
- ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy) {
+ ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+ AutoCloseable... uponShutdown) {

Review comment:
   We can always keep a constructor with the old signature along with the 
new if we wanted not to break classes that use `DistributedHerder`. I'm fine 
with the change here as a short term workaround. I guess it saves us one 
constructor but we can use it only once.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically 
created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same 
shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link 
#close(Duration)}.
+ *
+ * The owner of this object is responsible for ensuring that either {@link 
#close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. 
Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and 
{@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * This class is thread-safe. It also appears as immutable to callers that 
obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier {
+
+// Visible for testing
+static 

[GitHub] [kafka] hachikuji commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums

2021-02-04 Thread GitBox


hachikuji commented on a change in pull request #10045:
URL: https://github.com/apache/kafka/pull/10045#discussion_r570727037



##
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.network.RequestChannel
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.requests.AbstractResponse
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
+   * despite being optional when using ZooKeeper and required when using Raft
+   */
+  val forwardingManager: Option[ForwardingManager]
+
+  /**
+   * Return this instance downcast for use with ZooKeeper
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with ZooKeeper
+   * @throws Exception if this instance is not for ZooKeeper
+   */
+  def requireZkOrThrow(createException: => Exception): ZkSupport
+
+  /**
+   * Return this instance downcast for use with Raft
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with Raft
+   * @throws Exception if this instance is not for Raft
+   */
+  def requireRaftOrThrow(createException: => Exception): RaftSupport
+
+  /**
+   * Confirm that this instance is consistent with the given config
+   *
+   * @param config the config to check for consistency with this instance
+   * @throws IllegalStateException if there is an inconsistency (Raft for a 
ZooKeeper config or vice-versa)
+   */
+  def confirmConsistentWith(config: KafkaConfig): Unit
+
+  def maybeForward(request: RequestChannel.Request,
+   handler: RequestChannel.Request => Unit,
+   responseCallback: Option[AbstractResponse] => Unit): Unit
+}
+
+case class ZkSupport(adminManager: ZkAdminManager,
+ controller: KafkaController,
+ zkClient: KafkaZkClient,
+ forwardingManager: Option[ForwardingManager]) extends 
MetadataSupport {
+  val adminZkClient = new AdminZkClient(zkClient)
+
+  override def requireZkOrThrow(createException: => Exception): ZkSupport = 
this
+  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= throw createException
+
+  override def confirmConsistentWith(config: KafkaConfig): Unit = {
+if (!config.requiresZookeeper) {
+  throw new IllegalStateException("Config specifies Raft but metadata 
support instance is for ZooKeeper")
+}
+  }
+
+  override def maybeForward(request: RequestChannel.Request,
+handler: RequestChannel.Request => Unit,
+responseCallback: Option[AbstractResponse] => 
Unit): Unit = {
+if (forwardingManager.isDefined && !request.isForwarded && 
!controller.isActive) {

Review comment:
   nit: usually when you see `isDefined` followed by `get`, there is likely 
an opportunity for a `match` or `foreach`. 
   ```scala
   forwardingManager match {
 case Some(mgr) if !request.isForwarded && !controllers.isActive =>
   forwardingManager.get.forwardRequest(request, responseCallback)
 case _ => 
   handler(request)
   }
   ```

##
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific 

[GitHub] [kafka] bob-barrett commented on pull request #10065: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-02-04 Thread GitBox


bob-barrett commented on pull request #10065:
URL: https://github.com/apache/kafka/pull/10065#issuecomment-773800777


   This is a backport of 131d475 to 2.4. As with #10061, the conflicts were due 
to the missing connection timeout config, which was added in 2.7, and didn't 
meaningfully change the behavior.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett commented on pull request #10067: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-02-04 Thread GitBox


bob-barrett commented on pull request #10067:
URL: https://github.com/apache/kafka/pull/10067#issuecomment-773800834


   This is a backport of 131d475 to 2.3. As with #10061, the conflicts were due 
to the missing connection timeout config, which was added in 2.7, and didn't 
meaningfully change the behavior.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett opened a new pull request #10067: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-02-04 Thread GitBox


bob-barrett opened a new pull request #10067:
URL: https://github.com/apache/kafka/pull/10067


   This patch changes the NetworkClient behavior to resolve the target node's 
hostname after disconnecting from an established connection, rather than 
waiting until the previously-resolved addresses are exhausted. This is to 
handle the scenario when the node's IP addresses have changed during the 
lifetime of the connection, and means that the client does not have to try to 
connect to invalid IP addresses until it has tried each address.
   
   Reviewers: Mickael Maison , Satish Duggana 
, David Jacot 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-04 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   The main motivation to add `ProcessorContext#currentSystemTime()` was to 
be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if 
we return the cached time from `AbstractProcessorContext`, we will be able to 
return the mocked time, as we update the cached time based on the mocked time 
in `TopologyTestDriver`.
   
   - `InternalMockProcessorContext` is just for our own unit testing -- it's 
fine to add the new `Time` field, it's not a public facing change anyway
   - Originally we changed `ProcessorContextImpl` because we remove 
`AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we 
should keep the cached time in `AbstractProcessorContext()`, and thus we don't 
need `ProcessorContextImpl#currentSystemTime()` any longer.
   - `GlobalProcessorContextImpl` is a different code path, and thus the 
changes of this PR are fine
   
   > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when 
do you want to return this field?
   
   Yes, we want to return this field. (We get this behavior by adding back 
`AbstractProcessorContext#currentSystemTime()` (and the cached time in this 
class) and removing `ProcessorContextImpl#currentSystemTime()`.
   
   > Are earlier changes not valid to return time from StreamTask?
   
   Yes, I think we can revert all changes from `StreamTask`.
   
   Does this make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-04 Thread GitBox


mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r570716520



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   The main motivation to add `ProcessorContext#currentSystemTime()` was to 
be able to return the mocked wall-clock time in `TopologyTestDriver`. Even if 
we return the cached time from `AbstractProcessorContext`, we will be able to 
return the mocked time, as we update the cached time based on the mocked time 
in `TopologyTestDriver`.
   
   - `InternalMockProcessorContext` is just for our own unit testing -- it's 
fine to add the new `Time` field, it's not a public facing change anyway
   - Originally we changed `ProcessorContextImpl` because we remove 
`AbstractProcessorContext#currentSystemTime()` -- as suggested by Guozhang, we 
should keep the cached time in `AbstractProcessorContext()`, and thus we don't 
need `ProcessorContextImpl#currentSystemTime()` any longer.
   - `GlobalProcessorContextImpl` is a different code path, and thus the 
changes of this PR are fine
   
   > If we add new cachedSystemTimeMs field in AbstractProcessorContext, when 
do you want to return this field?
   
   Yes, we want to return this field. (We get this behaviro by adding back 
`AbstractProcessorContext#currentSystemTime()` (and the cached time in this 
class) and removing `ProcessorContextImpl#currentSystemTime()`.
   
   > Are earlier changes not valid to return time from StreamTask?
   
   Yes, I think we can revert all changes from `StreamTask`.
   
   Does this make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

2021-02-04 Thread GitBox


hachikuji opened a new pull request #10066:
URL: https://github.com/apache/kafka/pull/10066


   With KIP-500, we have more complex requirements on API accessibility. 
Previously all APIs were accessible on every listener exposed by the broker, 
but now that is no longer true. For example:
   
   - the controller exposes some APIs which are not accessible on the broker 
listener (e.g. quorum/registration/heartbeat APIs)
   - most of the client APIs are not exposed on the controller (e.g. consumer 
group apis)
   - there are some APIs which are not implemented by the KIP-500 broker (e.g. 
`LeaderAndIsr` and `UpdateMetadata`)
   - there are some APIs which are only implemented by the KIP-500 broker (e.g. 
`DecommissionBroker` and `DescribeQuorum`)
   
   All of this means that we need more sophistication in how we expose APIs and 
keep them consistent with the `ApiVersions` API. Up to now, we have been 
working around this using the `controllerOnly` flag inside `ApiKeys`, but this 
is not rich enough to support all of the cases listed above.
   
   In this patch, we address this by problem by introducing a new `scope` field 
to the request schema definitions. This field is an array of strings which 
indicate the scope in which the API should be exposed. We currently support the 
following scopes: 
   
   - `zkBroker`: old broker
   - `broker`: kip-500 broker
   - `controller`: kip-500 controller
   - `raft`: raft test server
   
   For example, the `DecommissionBroker` API has the following scope tag:
   ```json
 "scope": ["broker", "controller"]
   ```
   This indicates that the API is only on the KIP-500 broker and controller 
(both are needed because the request will be sent by clients and forwarded to 
the controller).
   
   The patch changes the generator so that the scope definitions are added to 
`ApiMessageType` and exposed through convenient helpers. At the same time, we 
have removed the `controllerOnly` flag from `ApiKeys` since now we can identify 
all controller APIs through the "controller" scope tag.
   
   The rest of the patch is dedicated to ensuring that the API scope is 
properly set. We have created a new `ApiVersionManager` which encapsulates the 
creation of the `ApiVersionsResponse` based on the scope. Additionally, 
`SocketServer` is modified to ensure the scope of received requests before 
forwarding them to the request handler.
   
   We have also fixed a bug in the handling of the `ApiVersionsResponse` prior 
to authentication. Previously a static response was sent, which means that 
changes to features would not get reflected. This also meant that the logic to 
ensure that only the intersection of version ranges supported by the controller 
would get exposed did not work. I think this is important because some clients 
rely on the initial pre-authenticated `ApiVersions` response rather than doing 
a second round after authentication as the Java client does.
   
   One final cleanup note: I have removed the expectation that envelope 
requests are only allowed on "privileged" listeners. This made sense initially 
because we expected to use forwarding before the KIP-500 controller was 
available. That is not the case anymore and we expect the `Envelope` API to 
only be exposed on the controller listener. I have nevertheless preserved the 
existing workarounds to allow this API to verify forwarding behavior in 
integration testing.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10048: MINOR: add docs for KIP-680

2021-02-04 Thread GitBox


mjsax merged pull request #10048:
URL: https://github.com/apache/kafka/pull/10048


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-02-04 Thread GitBox


mjsax commented on pull request #1:
URL: https://github.com/apache/kafka/pull/1#issuecomment-773774753


   Updated this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10048: MINOR: add docs for KIP-680

2021-02-04 Thread GitBox


mjsax commented on a change in pull request #10048:
URL: https://github.com/apache/kafka/pull/10048#discussion_r570698424



##
File path: docs/streams/upgrade-guide.html
##
@@ -121,6 +121,12 @@ Streams API
 the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size;>KIP-659
 has more details.
 
+
+To simplify testing, two new constructors that don't require a 
Properties parameter have been added to the 
TopologyTestDriver class.
+required to pass in a Properties parameter. If 
Properties are passed

Review comment:
   ```suggestion
   to the TopologyTestDriver class. If 
Properties are passed
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10048: MINOR: add docs for KIP-680

2021-02-04 Thread GitBox


mjsax commented on a change in pull request #10048:
URL: https://github.com/apache/kafka/pull/10048#discussion_r570698397



##
File path: docs/streams/upgrade-guide.html
##
@@ -121,6 +121,12 @@ Streams API
 the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size;>KIP-659
 has more details.
 
+
+To simplify testing, two new constructors that don't require a 
Properties parameter have been added to the 
TopologyTestDriver class.

Review comment:
   ```suggestion
   To simplify testing, two new constructors that don't require a 
Properties parameter have been added
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JimGalasyn commented on a change in pull request #10048: MINOR: add docs for KIP-680

2021-02-04 Thread GitBox


JimGalasyn commented on a change in pull request #10048:
URL: https://github.com/apache/kafka/pull/10048#discussion_r570687920



##
File path: docs/streams/upgrade-guide.html
##
@@ -121,6 +121,12 @@ Streams API
 the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size;>KIP-659
 has more details.
 
+
+To simplify testing, two new constructors are added to 
TopologyTestDriver class, that both don't
+required to pass in a Properties parameter. Furthermore, 
even if Properties are passed

Review comment:
   ```suggestion
   required to pass in a Properties parameter. If 
Properties are passed
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-02-04 Thread GitBox


mjsax commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r570695899



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -227,6 +230,27 @@ public void initializeIfNeeded() {
 }
 }
 
+private void initOffsetsIfNeeded(final 
java.util.function.Consumer> offsetResetter) {
+final Map committed = 
mainConsumer.committed(resetOffsetsForPartitions);
+for (final Map.Entry committedEntry 
: committed.entrySet()) {
+final OffsetAndMetadata offsetAndMetadata = 
committedEntry.getValue();
+if (offsetAndMetadata != null) {
+mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata);
+resetOffsetsForPartitions.remove(committedEntry.getKey());
+}
+}
+
+if (!resetOffsetsForPartitions.isEmpty()) {

Review comment:
   Fair enough.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-02-04 Thread GitBox


mjsax commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r570691793



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -227,6 +230,27 @@ public void initializeIfNeeded() {
 }
 }
 
+private void initOffsetsIfNeeded(final 
java.util.function.Consumer> offsetResetter) {

Review comment:
   Good point!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`

2021-02-04 Thread GitBox


mjsax merged pull request #9997:
URL: https://github.com/apache/kafka/pull/9997


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose

2021-02-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17279291#comment-17279291
 ] 

Matthias J. Sax commented on KAFKA-7540:


[https://github.com/apache/kafka/pull/9997/checks?check_run_id=1827542034]
{code:java}
org.opentest4j.AssertionFailedError: Assignment did not complete on time ==> 
expected:  but was: 
  at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
  at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
  at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
  at kafka.api.ConsumerBounceTest.checkClosedState(ConsumerBounceTest.scala:486)
  at 
kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:257)
  at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:220)
{code}
STDOUT
{code:java}
[2021-02-04 01ː55ː43,911] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=fatal-exception-test] JoinGroup failed due to fatal error: The consumer 
group has reached its max size. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:636)
[2021-02-04 01ː55ː43,913] ERROR [daemon-consumer-assignment]: Error due to 
(kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76) 
org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group 
fatal-exception-test already has the configured maximum number of members.
[2021-02-04 01ː56ː22,198] ERROR [Consumer clientId=ConsumerTestConsumer, 
groupId=group-max-size-test] JoinGroup failed due to fatal error: The consumer 
group has reached its max size. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:636)
[2021-02-04 01ː56ː22,199] ERROR [daemon-consumer-assignment]: Error due to 
(kafka.api.AbstractConsumerTest$ConsumerAssignmentPoller:76) 
org.apache.kafka.common.errors.GroupMaxSizeReachedException: Consumer group 
group-max-size-test already has the configured maximum number of members.
{code}

> Flaky Test ConsumerBounceTest#testClose
> ---
>
> Key: KAFKA-7540
> URL: https://issues.apache.org/jira/browse/KAFKA-7540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: John Roesler
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: flaky-test
>
> Observed on Java 8: 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]
>  
> Stacktrace:
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
>   at 
> kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
>   at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> 

[GitHub] [kafka] JimGalasyn commented on a change in pull request #10048: MINOR: add docs for KIP-680

2021-02-04 Thread GitBox


JimGalasyn commented on a change in pull request #10048:
URL: https://github.com/apache/kafka/pull/10048#discussion_r570687920



##
File path: docs/streams/upgrade-guide.html
##
@@ -121,6 +121,12 @@ Streams API
 the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size;>KIP-659
 has more details.
 
+
+To simplify testing, two new constructors are added to 
TopologyTestDriver class, that both don't
+required to pass in a Properties parameter. Furthermore, 
even if Properties are passed

Review comment:
   ```suggestion
   If Properties are passed
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JimGalasyn commented on a change in pull request #10048: MINOR: add docs for KIP-680

2021-02-04 Thread GitBox


JimGalasyn commented on a change in pull request #10048:
URL: https://github.com/apache/kafka/pull/10048#discussion_r570687710



##
File path: docs/streams/upgrade-guide.html
##
@@ -121,6 +121,12 @@ Streams API
 the constructor, such as when using the console consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size;>KIP-659
 has more details.
 
+
+To simplify testing, two new constructors are added to 
TopologyTestDriver class, that both don't

Review comment:
   ```suggestion
   To simplify testing, two new constructors that don't require a 
Properties parameter have been added to the 
TopologyTestDriver class.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10044: MINOR: Word count should account for extra whitespaces between words

2021-02-04 Thread GitBox


mjsax merged pull request #10044:
URL: https://github.com/apache/kafka/pull/10044


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2021-02-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17279288#comment-17279288
 ] 

Matthias J. Sax commented on KAFKA-12283:
-

Failed again: 
https://github.com/apache/kafka/pull/10044/checks?check_run_id=1827160439

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Chia-Ping Tsai
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax merged pull request #10046: MINOR: Extends RocksDB docs

2021-02-04 Thread GitBox


mjsax merged pull request #10046:
URL: https://github.com/apache/kafka/pull/10046


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10046: MINOR: Extends RocksDB docs

2021-02-04 Thread GitBox


mjsax commented on a change in pull request #10046:
URL: https://github.com/apache/kafka/pull/10046#discussion_r570683354



##
File path: docs/streams/developer-guide/memory-mgmt.html
##
@@ -168,7 +168,15 @@
   RocksDB
Each instance of RocksDB allocates off-heap memory for a block 
cache, index and filter blocks, and memtable (write buffer). Critical configs 
(for RocksDB version 4.1.0) include
 block_cache_size, write_buffer_size and max_write_buffer_number.  These can be specified 
through the
-rocksdb.config.setter configuration.
+rocksdb.config.setter configuration.
+  Also, we recommend changing RocksDB's default memory allocator, 
because the default allocator may lead to increased memory consumption.
+To change the memory allocator to jemalloc, you need to 
set the an environment variable before you start your Kafka Streams 
application:
+  
+# example: install jemalloc (on Debian)
+$ apt install -y libjemalloc-dev
+# set LD_PRELOAD before you start your Kafka Streams application
+$ LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so”

Review comment:
   Good catch!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett opened a new pull request #10065: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-02-04 Thread GitBox


bob-barrett opened a new pull request #10065:
URL: https://github.com/apache/kafka/pull/10065


   This patch changes the NetworkClient behavior to resolve the target node's 
hostname after disconnecting from an established connection, rather than 
waiting until the previously-resolved addresses are exhausted. This is to 
handle the scenario when the node's IP addresses have changed during the 
lifetime of the connection, and means that the client does not have to try to 
connect to invalid IP addresses until it has tried each address.
   
   Reviewers: Mickael Maison , Satish Duggana 
, David Jacot 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett commented on pull request #10064: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-02-04 Thread GitBox


bob-barrett commented on pull request #10064:
URL: https://github.com/apache/kafka/pull/10064#issuecomment-773736235


   This is a backport of 131d4753cfed65ed6dee0a8c754765c97c3d513f to 2.5. As 
with https://github.com/apache/kafka/pull/10061, the conflicts were due to the 
missing connection timeout config, which was added in 2.7, and didn't 
meaningfully change the behavior.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett opened a new pull request #10064: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-02-04 Thread GitBox


bob-barrett opened a new pull request #10064:
URL: https://github.com/apache/kafka/pull/10064


   This patch changes the NetworkClient behavior to resolve the target node's 
hostname after disconnecting from an established connection, rather than 
waiting until the previously-resolved addresses are exhausted. This is to 
handle the scenario when the node's IP addresses have changed during the 
lifetime of the connection, and means that the client does not have to try to 
connect to invalid IP addresses until it has tried each address.
   
   Reviewers: Mickael Maison , Satish Duggana 
, David Jacot 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #10063: KAFKA-12258: Add support for splitting appending records

2021-02-04 Thread GitBox


jsancio opened a new pull request #10063:
URL: https://github.com/apache/kafka/pull/10063


   1. Type `BatchAccumulator`. Add support for appending records into one or 
more batches.
   2. Type `RaftClient`. Rename `scheduleAppend` to `scheduleAtomicAppend`.
   3. Type `RaftClient`. Add a new method `scheduleAppend` which appends 
records to the log using as many batches as necessary.
   4. Increase the batch size from 1MB to 8MB.
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-04 Thread GitBox


rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570657028



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically 
created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same 
shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link 
#close(Duration)}.
+ *
+ * The owner of this object is responsible for ensuring that either {@link 
#close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. 
Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and 
{@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * This class is thread-safe. It also appears as immutable to callers that 
obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier {
+
+// Visible for testing
+static final Duration DEFAULT_CLOSE_DURATION = 
Duration.ofMillis(Long.MAX_VALUE);
+
+private final Map adminProps;
+private final AtomicReference admin = new AtomicReference<>();
+private final AtomicBoolean closed = new AtomicBoolean(false);
+private final Function, TopicAdmin> factory;
+
+public SharedTopicAdmin(Map adminProps) {
+this(adminProps, TopicAdmin::new);
+}
+
+// Visible for testing
+SharedTopicAdmin(Map adminProps, Function, TopicAdmin> factory) {
+this.adminProps = Objects.requireNonNull(adminProps);
+this.factory = Objects.requireNonNull(factory);
+}
+
+/**
+ * Get the shared {@link TopicAdmin} instance.
+ *
+ * @return the shared instance; never null
+ * @throws ConnectException if this object has already been closed
+ */
+@Override
+public TopicAdmin get() {
+return topicAdmin();
+}
+
+/**
+ * Get the shared {@link TopicAdmin} instance.
+ *
+ * @return the shared instance; never null
+ * @throws ConnectException if this object has already been closed
+ */
+public TopicAdmin topicAdmin() {
+return admin.updateAndGet(this::createAdmin);

Review comment:
   I'm not sure there is much advantage either way, considering these 
methods are not called frequently and `synchronized` would indeed work. I 
personally like the simplicity of using `AtomicReference`, which to me seemed 
natural and straightforward, avoided having to synchronize the entire methods, 
and needed no if-checks in this method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-04 Thread GitBox


rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570655008



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+private static final Map CONFIG = Collections.emptyMap();
+
+@Mock private TopicAdmin mockTopicAdmin;
+private SharedTopicAdmin sharedAdmin;
+private int created = 0;

Review comment:
   Really I'm just using that to be able to test that the new 
`topicAdmin()` method is returning the correct instance, even after repeated 
calls. It was an easy way to verify that the `TopicAdmin` matches what the 
factory function returned.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570654953



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
   Sounds good





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12295) Shallow Mirroring

2021-02-04 Thread Henry Cai (Jira)
Henry Cai created KAFKA-12295:
-

 Summary: Shallow Mirroring
 Key: KAFKA-12295
 URL: https://issues.apache.org/jira/browse/KAFKA-12295
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, core, mirrormaker, producer 
Reporter: Henry Cai
Assignee: Henry Cai
 Fix For: 2.8.0


KIP-712: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-04 Thread GitBox


rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570652791



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
##
@@ -457,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() {
 }
 }
 
+@Test
+public void 
endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
+String topicName = "myTopic";
+TopicPartition tp1 = new TopicPartition(topicName, 0);
+Set tps = Collections.singleton(tp1);
+Long offset = null; // response should use error
+Cluster cluster = createCluster(1, topicName, 1);
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1,
 offset));
+TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+ConnectException e = assertThrows(ConnectException.class, () -> {
+admin.endOffsets(tps);
+});
+assertTrue(e.getMessage().contains("Not authorized to get the end 
offsets"));
+}
+}
+
+@Test
+public void 
endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() {
+String topicName = "myTopic";
+TopicPartition tp1 = new TopicPartition(topicName, 0);
+Set tps = Collections.singleton(tp1);
+Long offset = null; // response should use error
+Cluster cluster = createCluster(1, topicName, 1);
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, 
offset));
+TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+ConnectException e = assertThrows(ConnectException.class, () -> {
+admin.endOffsets(tps);
+});
+assertTrue(e.getMessage().contains("is unsupported on brokers"));
+}
+}
+
+@Test
+public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() {
+String topicName = "myTopic";
+TopicPartition tp1 = new TopicPartition(topicName, 0);
+Set tps = Collections.singleton(tp1);
+Long offset = null; // response should use error
+Cluster cluster = createCluster(1, topicName, 1);
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));
+TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+RetriableException e = assertThrows(RetriableException.class, () 
-> {
+admin.endOffsets(tps);
+});
+assertTrue(e.getMessage().contains("Timed out while waiting"));
+}
+}
+
+@Test
+public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() {
+String topicName = "myTopic";
+TopicPartition tp1 = new TopicPartition(topicName, 0);
+Set tps = Collections.singleton(tp1);
+Long offset = null; // response should use error
+Cluster cluster = createCluster(1, topicName, 1);
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, 
offset));
+TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+ConnectException e = assertThrows(ConnectException.class, () -> {
+admin.endOffsets(tps);
+});
+assertTrue(e.getMessage().contains("Error while getting end 
offsets for topic"));
+}
+}
+
+@Test
+public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() {
+String topicName = "myTopic";
+Cluster cluster = createCluster(1, topicName, 1);
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(), cluster)) {
+TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+Map offsets = 
admin.endOffsets(Collections.emptySet());
+

[GitHub] [kafka] kkonstantine merged pull request #10053: KAFKA-10834: Remove redundant type casts in Connect

2021-02-04 Thread GitBox


kkonstantine merged pull request #10053:
URL: https://github.com/apache/kafka/pull/10053


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API (KIP-661)

2021-02-04 Thread GitBox


kkonstantine merged pull request #9726:
URL: https://github.com/apache/kafka/pull/9726


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API

2021-02-04 Thread GitBox


kkonstantine commented on pull request #9726:
URL: https://github.com/apache/kafka/pull/9726#issuecomment-773706451


   One failure on an unrelated flaky test
   Merging to coordinate merge with another PR. Thanks @mimaison !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-04 Thread GitBox


rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570648556



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -185,16 +188,33 @@
 
 private final DistributedConfig config;
 
+/**
+ * Create a herder that will form a Connect cluster with other {@link 
DistributedHerder} instances (in this or other JVMs)
+ * that have the same group ID.
+ *
+ * @param config the configuration for the worker; may not be 
null
+ * @param time   the clock to use; may not be null
+ * @param worker the {@link Worker} instance to use; may not 
be null
+ * @param kafkaClusterId the identifier of the Kafka cluster to use 
for internal topics; may not be null
+ * @param statusBackingStore the backing store for statuses; may not be 
null
+ * @param configBackingStore the backing store for connector 
configurations; may not be null
+ * @param restUrlthe URL of this herder's REST API; may not be 
null
+ * @param connectorClientConfigOverridePolicy the policy specifying the 
client configuration properties that may be overridden
+ *in connector configurations; 
may not be null
+ * @param uponShutdown   any {@link AutoCloseable} objects that should 
be closed when this herder is {@link #stop() stopped},
+ *   after all services and resources owned by 
this herder are stopped
+ */
 public DistributedHerder(DistributedConfig config,
  Time time,
  Worker worker,
  String kafkaClusterId,
  StatusBackingStore statusBackingStore,
  ConfigBackingStore configBackingStore,
  String restUrl,
- ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy) {
+ ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+ AutoCloseable... uponShutdown) {

Review comment:
   The reason I used a variadic array here was to avoid having to create a 
new connector when no `AutoCloseable` instances are supplied. If we use a List, 
then we can change the usage in Connect runtime and in MirrorMaker 2, but 
anywhere else will break without keeping the old signature. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-02-04 Thread GitBox


mjsax commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-773692994


   Merged to `trunk`.
   
   Congrats for getting this into the 2.8.0 release @inponomarev -- great work!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-02-04 Thread GitBox


mjsax merged pull request #9107:
URL: https://github.com/apache/kafka/pull/9107


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #10046: MINOR: Extends RocksDB docs

2021-02-04 Thread GitBox


rodesai commented on a change in pull request #10046:
URL: https://github.com/apache/kafka/pull/10046#discussion_r570627385



##
File path: docs/streams/developer-guide/memory-mgmt.html
##
@@ -168,7 +168,15 @@
   RocksDB
Each instance of RocksDB allocates off-heap memory for a block 
cache, index and filter blocks, and memtable (write buffer). Critical configs 
(for RocksDB version 4.1.0) include
 block_cache_size, write_buffer_size and max_write_buffer_number.  These can be specified 
through the
-rocksdb.config.setter configuration.
+rocksdb.config.setter configuration.
+  Also, we recommend changing RocksDB's default memory allocator, 
because the default allocator may lead to increased memory consumption.
+To change the memory allocator to jemalloc, you need to 
set the an environment variable before you start your Kafka Streams 
application:
+  
+# example: install jemalloc (on Debian)
+$ apt install -y libjemalloc-dev
+# set LD_PRELOAD before you start your Kafka Streams application
+$ LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so”

Review comment:
   just doing `LD_PRELOAD=` will only set the variable for that one 
command, which isn't doing anything. This should either be:
   ```
   $ export LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
   $ 
   ```
   OR
   ```
   $ LD_PRELOAD="/usr/lib/x86_64-linux-gnu/libjemalloc.so" 
   ```
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers closed pull request #10058: [2.5] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior

2021-02-04 Thread GitBox


gardnervickers closed pull request #10058:
URL: https://github.com/apache/kafka/pull/10058


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers closed pull request #10057: [2.4] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior

2021-02-04 Thread GitBox


gardnervickers closed pull request #10057:
URL: https://github.com/apache/kafka/pull/10057


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers closed pull request #10055: [2.3] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior

2021-02-04 Thread GitBox


gardnervickers closed pull request #10055:
URL: https://github.com/apache/kafka/pull/10055


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett commented on pull request #10055: [2.3] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior

2021-02-04 Thread GitBox


bob-barrett commented on pull request #10055:
URL: https://github.com/apache/kafka/pull/10055#issuecomment-773678966


   @ableegoldman the 2.6 backport is 
https://github.com/apache/kafka/pull/10061. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] szpak opened a new pull request #10062: MINOR: Add performAndClose default method in KeyValueIterator

2021-02-04 Thread GitBox


szpak opened a new pull request #10062:
URL: https://github.com/apache/kafka/pull/10062


   That method intends to increase a chance to have KeyValueIterator closed
   after usage, by providing a convenient performAndClose() default method which
   executes a given operation and guarantee to automatically close the iterator
   right after.
   
   ### Rationality
   
   I decided to create that PR observing in different projects how often an 
iterator is left open after performing operations such as 
   `.forEachRemaining(...)` (who reads JavaDoc after all? ;-) ). For people 
aware of the problem, instead of verbose try-with-resources constructions 
repeated in every place in code:
   
   ```
   try (KeyValueIterator ordersKeyValueIterator =
streamsBuilderFactoryBean
.getKafkaStreams()
.store(StoreQueryParameters.fromNameAndType(ORDERS_STORE,
QueryableStoreTypes.keyValueStore()))
.all()) {
   ordersKeyValueIterator
   .forEachRemaining(kv -> {
   ...
   });
   }
   ```
   (or hidden in in-house built utility classes)
   
   a developer using Kafka Streams has a built-in clear way to perform 
operations on elements contained in an iterator (and have that stream closed 
after that automatically):
   ```
   streamsBuilderFactoryBean
   .getKafkaStreams()
   .store(StoreQueryParameters.fromNameAndType(ORDERS_STORE,
   QueryableStoreTypes.keyValueStore()))
   .all()
   .performAndClose(iterator -> iterator
   .forEachRemaining(kv -> {
   ...
   })
   );
   ```
   
   I believe it can increase a rate of closed iterator in user projects.
   
   ### Testing approach
   
   Being a default method in KeyValueIterator, I decided to test it using 
KeyValueIteratorFacade which already provides a nice unit testing 
infrastructure with mocked iterator. I don't know the implementation details, 
but I expect to have it behaved similarly in other implementations.
   I might miss some extra cases, so feel free to point them and I will happily 
cover them with tests.
   
   ### Rejected extensions
   
   For some time past, I was thinking also about covering operations that 
returns some value. However, at least in projects I've been observing the usage 
of Kafka Streams, performing side effects is more popular and implementation 
for that use cases might be more tricky and I decided to start with pure 
Consumer.
   
   ### Other information
   
   Naming is not my area of expertise. Feel free to propose any better name or 
any other possible enhancements to my MR :-).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett commented on pull request #10061: KAFKA-12193: Re-resolve IPs after a client disconnects

2021-02-04 Thread GitBox


bob-barrett commented on pull request #10061:
URL: https://github.com/apache/kafka/pull/10061#issuecomment-773678032


   @dajac This is the backport of 
https://github.com/apache/kafka/commit/131d4753cfed65ed6dee0a8c754765c97c3d513f.
 The conflicts were because the connection timeout settings added by 
[KIP-601](https://cwiki.apache.org/confluence/display/KAFKA/KIP-601%3A+Configurable+socket+connection+timeout+in+NetworkClient)
 aren't present in 2.6. This didn't change the behavior of this patch, but it 
did require a slight change in 
`NetworkClientTest#testFailedConnectionToFirstAddress` to simulate a failed 
connection, since sleeping for the connection timeout wasn't an option.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett opened a new pull request #10061: KAFKA-12193: Re-resolve IPs after a client disconnects (#9902)

2021-02-04 Thread GitBox


bob-barrett opened a new pull request #10061:
URL: https://github.com/apache/kafka/pull/10061


   This patch changes the NetworkClient behavior to resolve the target node's 
hostname after disconnecting from an established connection, rather than 
waiting until the previously-resolved addresses are exhausted. This is to 
handle the scenario when the node's IP addresses have changed during the 
lifetime of the connection, and means that the client does not have to try to 
connect to invalid IP addresses until it has tried each address.
   
   Reviewers: Mickael Maison , Satish Duggana 
, David Jacot 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on pull request #10055: [2.3] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior

2021-02-04 Thread GitBox


gardnervickers commented on pull request #10055:
URL: https://github.com/apache/kafka/pull/10055#issuecomment-773675865


   @ableegoldman I believe @bob-barrett was working on a PR against 2.6. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10055: [2.3] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior

2021-02-04 Thread GitBox


ableegoldman commented on pull request #10055:
URL: https://github.com/apache/kafka/pull/10055#issuecomment-773675180


   Hey @gardnervickers , do you have plans to open a PR against 2.6 as well? 
Just checking in since it seems this fix has been ported to everything except 
2.6 so far and I want to keep track for the 2.6.2 release



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10060: KAFKA-10716: persist UUID in state directory for stable processId across restarts - 2.7

2021-02-04 Thread GitBox


ableegoldman opened a new pull request #10060:
URL: https://github.com/apache/kafka/pull/10060


   Port of https://github.com/apache/kafka/pull/9978 to the 2.7 branch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dpoldrugo commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer

2021-02-04 Thread GitBox


dpoldrugo commented on pull request #10059:
URL: https://github.com/apache/kafka/pull/10059#issuecomment-773663807


   @ijuma, @omkreddy, @rajinisivaram could you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-02-04 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r570605728



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax can you please explain again what is expected now as for me this 
is contrary to initial KIP. What I have understood from KIP, we wanted to 
return system time from Stream Task. Considering changes in this pr, 
   
   1. We added one time field in InternalMockProcessorContext which we return 
when currentSystemTime() is called. 
   2. When ProcessorContextImpl's currentSystemTime() method is called, we 
return time from streamTask's time field. 
   3. We also added time field in GlobalProcessorContextImpl which we return 
from currentSystemTime().
   4. If we add new cachedSystemTimeMs field in AbstractProcessorContext, when 
do you want to return this field? Are earlier changes not valid to return time 
from StreamTask?






This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

2021-02-04 Thread GitBox


kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570584036



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -209,7 +229,8 @@ public DistributedHerder(DistributedConfig config,
   String restUrl,
   ConnectMetrics metrics,
   Time time,
-  ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy) {
+  ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+  AutoCloseable... uponShutdown) {

Review comment:
   see comment above

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -185,16 +188,33 @@
 
 private final DistributedConfig config;
 
+/**
+ * Create a herder that will form a Connect cluster with other {@link 
DistributedHerder} instances (in this or other JVMs)
+ * that have the same group ID.
+ *
+ * @param config the configuration for the worker; may not be 
null
+ * @param time   the clock to use; may not be null
+ * @param worker the {@link Worker} instance to use; may not 
be null
+ * @param kafkaClusterId the identifier of the Kafka cluster to use 
for internal topics; may not be null
+ * @param statusBackingStore the backing store for statuses; may not be 
null
+ * @param configBackingStore the backing store for connector 
configurations; may not be null
+ * @param restUrlthe URL of this herder's REST API; may not be 
null
+ * @param connectorClientConfigOverridePolicy the policy specifying the 
client configuration properties that may be overridden
+ *in connector configurations; 
may not be null
+ * @param uponShutdown   any {@link AutoCloseable} objects that should 
be closed when this herder is {@link #stop() stopped},
+ *   after all services and resources owned by 
this herder are stopped
+ */
 public DistributedHerder(DistributedConfig config,
  Time time,
  Worker worker,
  String kafkaClusterId,
  StatusBackingStore statusBackingStore,
  ConfigBackingStore configBackingStore,
  String restUrl,
- ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy) {
+ ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
+ AutoCloseable... uponShutdown) {

Review comment:
   I think it's better to avoid a variadic argument here. 
   Parameters tend to get added with new features in such constructors. And if 
a new parameter is required that is also a list, then we'll have a mix of list 
args with a variadic in the end. 
   
   Since we transform to list I'd suggest using this type here and pass the 
single argument with `Collections.singletonList` in the caller. 

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)

[GitHub] [kafka] cmccabe merged pull request #10030: MINOR: Add KafkaEventQueue

2021-02-04 Thread GitBox


cmccabe merged pull request #10030:
URL: https://github.com/apache/kafka/pull/10030


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10030: MINOR: Add KafkaEventQueue

2021-02-04 Thread GitBox


cmccabe commented on pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#issuecomment-773648975


   Test failure is 
`org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest`
 which is not related.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API

2021-02-04 Thread GitBox


C0urante commented on pull request #9726:
URL: https://github.com/apache/kafka/pull/9726#issuecomment-773648389


   Just ran into a situation last night where this would have been super 
helpful. Thanks for adding this @mimaison!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API

2021-02-04 Thread GitBox


kkonstantine commented on a change in pull request #9726:
URL: https://github.com/apache/kafka/pull/9726#discussion_r570577845



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##
@@ -188,6 +188,16 @@ public ConnectorInfo getConnector(final 
@PathParam("connector") String connector
 return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config", "GET", headers, null, forward);
 }
 
+@GET
+@Path("/{connector}/tasks-config")
+public Map> getTasksConfig(final 
@PathParam("connector") String connector,
+  final @Context HttpHeaders 
headers,

Review comment:
   That's right  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dpoldrugo opened a new pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer

2021-02-04 Thread GitBox


dpoldrugo opened a new pull request #10059:
URL: https://github.com/apache/kafka/pull/10059


   Description:
   As suggested by @omkreddy in this 
[comment](https://issues.apache.org/jira/browse/KAFKA-8562?focusedCommentId=16912437=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16912437),
 implemented avoiding (reverse) DNS lookup while building underlying 
SslTransportLayer.
   
   How the problem manifested:
   When clients or other brokers are connecting to a broker using SASL_SSL, a 
broker was doing (reverse) DNS lookup and if there is no PTR Record, the lookup 
could last several seconds, which in the end caused big latencies on several 
parts of the system... replication, consume requests and produce requests.
   Here you can see a recorded sample: 
   https://user-images.githubusercontent.com/1514332/106959147-9033a580-673a-11eb-9575-4b9fe986cb30.png;>
   Also, here is a Wireshark packet capture for DNS requests, and in this case 
you can see that it lasted more then 11 seconds:
   ![KAFKA-8562 wireshark dns packet 
capture](https://user-images.githubusercontent.com/1514332/106960332-37650c80-673c-11eb-91ab-9cab8dd4873d.png)
   When using PLAINTEXT or SSL, this problem doesn't manifest.
   
   Solution:
   In #2835 , @rajinisivaram already added a helper method 
`SslChannelBuilder.peerHost`, so I just moved it to a new class called 
`ChannelBuilderUtils` and used it in `SaslChannelBuilder.buildTransportLayer` 
method.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570568151



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-  numPartitions: Int,
-  replicationFactor: Int,
-  properties: util.Properties = new 
util.Properties()): MetadataResponseTopic = {
-try {
-  adminZkClient.createTopic(topic, numPartitions, replicationFactor, 
properties, RackAwareMode.Safe)
-  info("Auto creation of topic %s with %d partitions and replication 
factor %d is successful"
-.format(topic, numPartitions, replicationFactor))
-  metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, 
isInternal(topic), util.Collections.emptyList())
-} catch {
-  case _: TopicExistsException => // let it go, possibly another broker 
created this topic

Review comment:
   The problem we have is that `ZkAdminManager.createTopics` only takes a 
callback instead of responding to you in realtime whether we hit TopicExists. 
Right now we are doing the topic creation async, so unless this is necessary to 
be fixed (which today we would just return UNKNOWN_PARTITION which seems to be 
semantically similar to LEADER_NOT_AVAILABLE), I think we could just returning 
unknown partition immediately without waiting for the async creation?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation

2021-02-04 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-12294:

Component/s: core

> Consider using the forwarding mechanism for metadata auto topic creation
> 
>
> Key: KAFKA-12294
> URL: https://issues.apache.org/jira/browse/KAFKA-12294
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Boyang Chen
>Priority: Major
>
> Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to 
> improve the topic creation auditing by forwarding the CreateTopicsRequest 
> inside Envelope for the given client. Details in 
> [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation

2021-02-04 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-12294:

Parent: (was: KAFKA-9705)
Issue Type: Improvement  (was: Sub-task)

> Consider using the forwarding mechanism for metadata auto topic creation
> 
>
> Key: KAFKA-12294
> URL: https://issues.apache.org/jira/browse/KAFKA-12294
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Boyang Chen
>Priority: Major
>
> Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to 
> improve the topic creation auditing by forwarding the CreateTopicsRequest 
> inside Envelope for the given client. Details in 
> [here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12294) Consider using the forwarding mechanism for metadata auto topic creation

2021-02-04 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-12294:
---

 Summary: Consider using the forwarding mechanism for metadata auto 
topic creation
 Key: KAFKA-12294
 URL: https://issues.apache.org/jira/browse/KAFKA-12294
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


Once [https://github.com/apache/kafka/pull/9579] is merged, there is a way to 
improve the topic creation auditing by forwarding the CreateTopicsRequest 
inside Envelope for the given client. Details in 
[here|https://github.com/apache/kafka/pull/9579#issuecomment-772283780]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] gardnervickers opened a new pull request #10058: [2.5] Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior

2021-02-04 Thread GitBox


gardnervickers opened a new pull request #10058:
URL: https://github.com/apache/kafka/pull/10058


   The original PR was #9902
   
   The reason for back-porting this improved test infrastructure is to address 
some flakes which we have seen when kafka.apache.org DNS changes, specifically 
in the ClusterConnectionStatesTest. This change uses a mocked resolver for the 
multi-ip tests.
   
   Related PR: #10055 #10057



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API

2021-02-04 Thread GitBox


mimaison commented on a change in pull request #9726:
URL: https://github.com/apache/kafka/pull/9726#discussion_r570545828



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##
@@ -188,6 +188,16 @@ public ConnectorInfo getConnector(final 
@PathParam("connector") String connector
 return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config", "GET", headers, null, forward);
 }
 
+@GET
+@Path("/{connector}/tasks-config")
+public Map> getTasksConfig(final 
@PathParam("connector") String connector,
+  final @Context HttpHeaders 
headers,

Review comment:
   You mean something like this?
   ```java
@GET
   @Path("/{connector}/tasks-config")
   public Map> getTasksConfig(
   final @PathParam("connector") String connector,
   final @Context HttpHeaders headers,
   final @QueryParam("forward") Boolean forward) throws Throwable {
   FutureCallback>> cb = new 
FutureCallback<>();
   ...
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers opened a new pull request #10057: Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior

2021-02-04 Thread GitBox


gardnervickers opened a new pull request #10057:
URL: https://github.com/apache/kafka/pull/10057


   The original PR was #9902
   
   The reason for back-porting this improved test infrastructure is to address 
some flakes which we have seen when kafka.apache.org DNS changes, specifically 
in the ClusterConnectionStatesTest. This change uses a mocked resolver for the 
multi-ip tests.
   
   Related PR: #10055 
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12293) Remove JCenter and Bintray repositories mentions out of Gradle build (sunset is announced for those repositories)

2021-02-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-12293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17279162#comment-17279162
 ] 

Dejan Stojadinović commented on KAFKA-12293:


Note: at the moment I am not sure about Kafka releasing process (i.e. 
publishing to a global public repositories). 
If Kafka exclusively publishes to JCenter then this solution should be expanded.

> Remove JCenter and Bintray repositories mentions out of Gradle build (sunset 
> is announced for those repositories)
> -
>
> Key: KAFKA-12293
> URL: https://issues.apache.org/jira/browse/KAFKA-12293
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Dejan Stojadinović
>Priority: Major
>
> *Intro:* 
> [https://jfrog.com/blog/into-the-sunset-bintray-jcenter-gocenter-and-chartcenter]
> *Quote (from a link above):*
> {quote}
> May 1st: Bintray, JCenter, GoCenter, and ChartCenter services will no longer 
> be available
> {quote}
> *Note:* it seems that Gradle will make some changes in order to resolve 
> _*jcenter()*_ to _*mavenCentral()*_ by a default: 
> [https://github.com/gradle/gradle/issues/16018] 
> I took the liberty to assign this to myself (I already have a few gradle 
> related commits in Kafka repository; also I have some work-in-progress PR for 
> this issue that will be pushed asap).
> FYI [~ijuma] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10005: MINOR: Add ConfigRepository, use in Partition and KafkaApis

2021-02-04 Thread GitBox


hachikuji merged pull request #10005:
URL: https://github.com/apache/kafka/pull/10005


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 opened a new pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)

2021-02-04 Thread GitBox


dejan2609 opened a new pull request #10056:
URL: https://github.com/apache/kafka/pull/10056


   Related jira ticket: 
   https://issues.apache.org/jira/browse/KAFKA-12293 **_Remove JCenter and 
Bintray repositories mentions out of Gradle build (sunset is announced for 
those repositories)_**
   
   Note: at the moment I am not sure about Kafka releasing process (i.e. 
publishing to global public repositories). If Kafka exclusively  publishes to 
JCenter then this solution should be expanded.
   
   FYI @ijuma 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers opened a new pull request #10055: Backport mocked HostResolver from KAFKA-12193 to avoid relying on kafka.apache.org for specific DNS behavior

2021-02-04 Thread GitBox


gardnervickers opened a new pull request #10055:
URL: https://github.com/apache/kafka/pull/10055


   The original PR was #9902 
   
   The reason for back-porting this improved test infrastructure is to address 
some flakes which we have seen when kafka.apache.org DNS changes, specifically 
in the `ClusterConnectionStatesTest`. This change uses a mocked resolver for 
the multi-ip tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-02-04 Thread GitBox


ableegoldman commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r570535029



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -227,6 +230,27 @@ public void initializeIfNeeded() {
 }
 }
 
+private void initOffsetsIfNeeded(final 
java.util.function.Consumer> offsetResetter) {
+final Map committed = 
mainConsumer.committed(resetOffsetsForPartitions);
+for (final Map.Entry committedEntry 
: committed.entrySet()) {
+final OffsetAndMetadata offsetAndMetadata = 
committedEntry.getValue();
+if (offsetAndMetadata != null) {
+mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata);
+resetOffsetsForPartitions.remove(committedEntry.getKey());
+}
+}
+
+if (!resetOffsetsForPartitions.isEmpty()) {

Review comment:
   Can we just pass in a no-op lambda instead? I'd rather avoid special 
handling for null input that isn't supposed to be null, just so we can use null 
in the tests (which are therefore not realistic tests since it should never be 
null, no?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12293) Remove JCenter and Bintray repositories mentions out of Gradle build (sunset is announced for those repositories)

2021-02-04 Thread Jira
Dejan Stojadinović created KAFKA-12293:
--

 Summary: Remove JCenter and Bintray repositories mentions out of 
Gradle build (sunset is announced for those repositories)
 Key: KAFKA-12293
 URL: https://issues.apache.org/jira/browse/KAFKA-12293
 Project: Kafka
  Issue Type: Task
  Components: build
Reporter: Dejan Stojadinović
Assignee: Dejan Stojadinović


*Intro:* 
[https://jfrog.com/blog/into-the-sunset-bintray-jcenter-gocenter-and-chartcenter]

*Quote (from a link above):*
{quote}
May 1st: Bintray, JCenter, GoCenter, and ChartCenter services will no longer be 
available
{quote}

*Note:* it seems that Gradle will make some changes in order to resolve 
_*jcenter()*_ to _*mavenCentral()*_ by a default: 
[https://github.com/gradle/gradle/issues/16018] 

I took the liberty to assign this to myself (I already have a few gradle 
related commits in Kafka repository; also I have some work-in-progress PR for 
this issue that will be pushed asap).

FYI [~ijuma] 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10054: KAFKA-12283 Flaky Test RebalanceSourceConnectorsIntegrationTest#testM…

2021-02-04 Thread GitBox


chia7712 commented on pull request #10054:
URL: https://github.com/apache/kafka/pull/10054#issuecomment-773582456


   @ramesh-muthusamy Could you take a look? thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #10054: KAFKA-12283 Flaky Test RebalanceSourceConnectorsIntegrationTest#testM…

2021-02-04 Thread GitBox


chia7712 opened a new pull request #10054:
URL: https://github.com/apache/kafka/pull/10054


   issue: https://issues.apache.org/jira/browse/KAFKA-12283
   
   It seems to me the new tasks distribution (8, 4, 4) is valid if following 
conditions are true.
   1. all tasks are assigned to (single) alive worker before we re-add workers
   2. rebalance is triggered by first re-added worker only (rebalance revokes 8 
tasks from alive worker).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync

2021-02-04 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-12284:
--

Assignee: (was: Chia-Ping Tsai)

> Flaky Test 
> MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
> -
>
> Key: KAFKA-12284
> URL: https://issues.apache.org/jira/browse/KAFKA-12284
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470]
> {quote} {{java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}}
> [...]
>  
> {{Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364)
>   ... 92 more
> Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.}}
> {quote}
> STDOUT
> {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats:  (org.apache.kafka.connect.runtime.WorkerSourceTask:354)
> org.apache.kafka.common.KafkaException: Producer is closed forcefully.
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282)
>   at java.lang.Thread.run(Thread.java:748)}}{quote}
> {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state 
> info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"}
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:458)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:225)}}
> {{}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2021-02-04 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-12283:
--

Assignee: Chia-Ping Tsai

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Chia-Ping Tsai
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12284) Flaky Test MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync

2021-02-04 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-12284:
--

Assignee: Chia-Ping Tsai

> Flaky Test 
> MirrorConnectorsIntegrationSSLTest#testOneWayReplicationWithAutoOffsetSync
> -
>
> Key: KAFKA-12284
> URL: https://issues.apache.org/jira/browse/KAFKA-12284
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Assignee: Chia-Ping Tsai
>Priority: Critical
>  Labels: flaky-test
>
> [https://github.com/apache/kafka/pull/9997/checks?check_run_id=1820178470]
> {quote} {{java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:366)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:341)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithAutoOffsetSync(MirrorConnectorsIntegrationBaseTest.java:419)}}
> [...]
>  
> {{Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:364)
>   ... 92 more
> Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'primary.test-topic-2' already exists.}}
> {quote}
> STDOUT
> {quote} {{2021-02-03 04ː19ː15,975] ERROR [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats:  (org.apache.kafka.connect.runtime.WorkerSourceTask:354)
> org.apache.kafka.common.KafkaException: Producer is closed forcefully.
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282)
>   at java.lang.Thread.run(Thread.java:748)}}{quote}
> {quote} {{[2021-02-03 04ː19ː36,767] ERROR Could not check connector state 
> info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420)
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"}
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:466)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:458)
>   at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:225)}}
> {{}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API

2021-02-04 Thread GitBox


kkonstantine commented on a change in pull request #9726:
URL: https://github.com/apache/kafka/pull/9726#discussion_r570505650



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##
@@ -188,6 +188,16 @@ public ConnectorInfo getConnector(final 
@PathParam("connector") String connector
 return completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config", "GET", headers, null, forward);
 }
 
+@GET
+@Path("/{connector}/tasks-config")
+public Map> getTasksConfig(final 
@PathParam("connector") String connector,
+  final @Context HttpHeaders 
headers,

Review comment:
   nit: this alignment might be a bit off. We can follow the pattern of 2 
tabs with first and last line on their own if that looks better I guess. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10045: MINOR: Allow KafkaApis to be configured for Raft controller quorums

2021-02-04 Thread GitBox


jsancio commented on a change in pull request #10045:
URL: https://github.com/apache/kafka/pull/10045#discussion_r570507065



##
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.controller.KafkaController
+import kafka.network.RequestChannel
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.requests.AbstractResponse
+
+sealed trait MetadataSupport {
+  /**
+   * Provide a uniform way of getting to the ForwardingManager, which is a 
shared concept
+   * despite being optional when using ZooKeeper and required when using Raft
+   */
+  val forwardingManager: Option[ForwardingManager]
+
+  /**
+   * Return this instance downcast for use with ZooKeeper
+   *
+   * @param createException function to create an exception to throw
+   * @return this instance downcast for use with ZooKeeper
+   * @throws Exception if this instance is not for ZooKeeper
+   */
+  def requireZk(createException: => Exception): ZkSupport

Review comment:
   This comment also applies to `requireRaft`.
   
   How about `requireZkOrThrow`? There is already precedence of this naming 
convention in Kafka and Java. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9726: KAFKA-10833: Expose task configurations in Connect REST API

2021-02-04 Thread GitBox


mimaison commented on pull request #9726:
URL: https://github.com/apache/kafka/pull/9726#issuecomment-773567218


   Thanks @kkonstantine for the feedback. I've pushed an update



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-02-04 Thread GitBox


ableegoldman commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r570495244



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -227,6 +230,27 @@ public void initializeIfNeeded() {
 }
 }
 
+private void initOffsetsIfNeeded(final 
java.util.function.Consumer> offsetResetter) {

Review comment:
   Hm...I'm not necessarily that concerned about calling 
`mainConsumer.committed` twice in rare cases (although maybe that would not be 
so good, since those rare cases happen to be those in which this is probably 
more likely to time out, right?)
   But personally, just coming into this code from the outside, it's super 
confusing to have two different methods for initializing the offsets. It seems 
more convoluted that way, to me. Also maybe I am missing some context here but 
why do we call `initOffsetsIfNeeded`  from `initializeIfNeeded` rather than 
from `completeRestoration` in the first place? We don't need to initialize main 
consumer offsets until it transitions to running





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570492431



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
 // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching 
listener was not found on the leader.
 // From version 6 onwards, we return LISTENER_NOT_FOUND to enable 
diagnosis of configuration errors.
 val errorUnavailableListeners = requestVersion >= 6
-val topicMetadata =
+val (topicMetadata, nonExistTopicMetadata) =
   if (authorizedTopics.isEmpty)
-Seq.empty[MetadataResponseTopic]
-  else {
-getTopicMetadata(
-  metadataRequest.allowAutoTopicCreation,
-  metadataRequest.isAllTopics,
-  authorizedTopics,
-  request.context.listenerName,
-  errorUnavailableEndpoints,
-  errorUnavailableListeners
-)
+(Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+  else
+getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+  request.context.listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners)
+
+nonExistTopicMetadata.foreach(metadata =>
+  try {
+// Validate topic name and propagate error if failed
+Topic.validate(metadata.name())
+  } catch {
+case e: Exception =>
+  metadata.setErrorCode(Errors.forException(e).code)
   }
+)
+
+if (nonExistTopicMetadata.nonEmpty && 
metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable) {
+  val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+  autoTopicCreationManager.createTopics(
+nonExistTopicMetadata.map(metadata => 
getTopicConfigs(metadata.name())).toSet, controllerMutationQuota)

Review comment:
   I guess we could rely on admin manager to do the validation for us.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570492221



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentHashMap
+
+import kafka.controller.KafkaController
+import kafka.utils.Logging
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.message.CreateTopicsRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.requests.CreateTopicsRequest
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.Map
+
+trait AutoTopicCreationManager {
+
+  def createTopics(
+topicNames: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota
+  ): Unit
+
+  def start(): Unit = {}
+
+  def shutdown(): Unit = {}
+}
+
+object AutoTopicCreationManager {
+
+  def apply(
+config: KafkaConfig,
+metadataCache: MetadataCache,
+time: Time,
+metrics: Metrics,
+threadNamePrefix: Option[String],
+adminManager: ZkAdminManager,
+controller: KafkaController,
+enableForwarding: Boolean
+  ): AutoTopicCreationManager = {
+
+val channelManager =
+  if (enableForwarding)
+Some(new BrokerToControllerChannelManager(
+  controllerNodeProvider = MetadataCacheControllerNodeProvider(
+config, metadataCache),
+  time = time,
+  metrics = metrics,
+  config = config,
+  channelName = "autoTopicCreationChannel",
+  threadNamePrefix = threadNamePrefix,
+  retryTimeoutMs = config.requestTimeoutMs.longValue
+))
+  else
+None
+new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, 
config.requestTimeoutMs)
+  }
+}
+
+class AutoTopicCreationManagerImpl(
+  channelManager: Option[BrokerToControllerChannelManager],
+  adminManager: ZkAdminManager,
+  controller: KafkaController,
+  requestTimeout: Int
+) extends AutoTopicCreationManager with Logging {
+
+  private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic]
+
+  override def start(): Unit = {
+channelManager.foreach(_.start())
+  }
+
+  override def shutdown(): Unit = {
+channelManager.foreach(_.shutdown())
+  }
+
+  override def createTopics(topics: Set[CreatableTopic],
+controllerMutationQuota: ControllerMutationQuota): 
Unit = {
+val topicConfigs = topics
+  .filter(topic => !inflightTopics.contains(topic.name()))

Review comment:
   You mean omit () for `topic.name()`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12279) Kafka 2.7 stream app issue

2021-02-04 Thread prabhu biradar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17279097#comment-17279097
 ] 

prabhu biradar commented on KAFKA-12279:


Hi Sophie Blee-Goldman, 

1) This error occurs on every restart post 2.7 Kafka upgrade.

2) Yes we are running 12 instances (6 ECS TASK  and each TASKk runs 2 threads)

3) Below is the detailed logs starting from application restart.

 

Completed 23.2 KiB/23.2 KiB (228.6 KiB/s) with 1 file(s) remainingCompleted 
23.2 KiB/23.2 KiB (228.6 KiB/s) with 1 file(s) remainingdownload: 
s3://x/perf/kafka-client-truststore.jks to ./client.truststore.jksCompleted 
256.0 KiB/20.9 MiB (1.9 MiB/s) with 1 file(s) remainingCompleted 512.0 KiB/20.9 
MiB (3.7 MiB/s) with 1 file(s) remainingCompleted 768.0 KiB/20.9 MiB (5.4 
MiB/s) with 1 file(s) remainingCompleted 1.0 MiB/20.9 MiB (7.2 MiB/s) with 1 
file(s) remaining  Completed 1.2 MiB/20.9 MiB (8.9 MiB/s) with 1 file(s) 
remaining  Completed 1.5 MiB/20.9 MiB (10.5 MiB/s) with 1 file(s) remaining 
Completed 1.8 MiB/20.9 MiB (12.1 MiB/s) with 1 file(s) remaining Completed 2.0 
MiB/20.9 MiB (13.8 MiB/s) with 1 file(s) remaining Completed 2.2 MiB/20.9 MiB 
(15.3 MiB/s) with 1 file(s) remaining Completed 2.5 MiB/20.9 MiB (16.9 MiB/s) 
with 1 file(s) remaining Completed 2.8 MiB/20.9 MiB (18.3 MiB/s) with 1 file(s) 
remaining Completed 3.0 MiB/20.9 MiB (19.9 MiB/s) with 1 file(s) remaining 
Completed 3.2 MiB/20.9 MiB (21.4 MiB/s) with 1 file(s) remaining Completed 3.5 
MiB/20.9 MiB (22.8 MiB/s) with 1 file(s) remaining Completed 3.8 MiB/20.9 MiB 
(24.3 MiB/s) with 1 file(s) remaining Completed 4.0 MiB/20.9 MiB (25.8 MiB/s) 
with 1 file(s) remaining Completed 4.2 MiB/20.9 MiB (27.0 MiB/s) with 1 file(s) 
remaining Completed 4.5 MiB/20.9 MiB (28.4 MiB/s) with 1 file(s) remaining 
Completed 4.8 MiB/20.9 MiB (29.9 MiB/s) with 1 file(s) remaining Completed 5.0 
MiB/20.9 MiB (31.1 MiB/s) with 1 file(s) remaining Completed 5.2 MiB/20.9 MiB 
(32.4 MiB/s) with 1 file(s) remaining Completed 5.5 MiB/20.9 MiB (33.7 MiB/s) 
with 1 file(s) remaining Completed 5.8 MiB/20.9 MiB (34.9 MiB/s) with 1 file(s) 
remaining Completed 6.0 MiB/20.9 MiB (36.1 MiB/s) with 1 file(s) remaining 
Completed 6.2 MiB/20.9 MiB (37.4 MiB/s) with 1 file(s) remaining Completed 6.5 
MiB/20.9 MiB (38.7 MiB/s) with 1 file(s) remaining Completed 6.8 MiB/20.9 MiB 
(39.9 MiB/s) with 1 file(s) remaining Completed 7.0 MiB/20.9 MiB (41.1 MiB/s) 
with 1 file(s) remaining Completed 7.2 MiB/20.9 MiB (42.3 MiB/s) with 1 file(s) 
remaining Completed 7.5 MiB/20.9 MiB (43.5 MiB/s) with 1 file(s) remaining 
Completed 7.8 MiB/20.9 MiB (44.6 MiB/s) with 1 file(s) remaining Completed 8.0 
MiB/20.9 MiB (45.7 MiB/s) with 1 file(s) remaining Completed 8.2 MiB/20.9 MiB 
(47.1 MiB/s) with 1 file(s) remaining Completed 8.5 MiB/20.9 MiB (48.1 MiB/s) 
with 1 file(s) remaining Completed 8.8 MiB/20.9 MiB (49.1 MiB/s) with 1 file(s) 
remaining Completed 9.0 MiB/20.9 MiB (50.2 MiB/s) with 1 file(s) remaining 
Completed 9.2 MiB/20.9 MiB (51.5 MiB/s) with 1 file(s) remaining Completed 9.5 
MiB/20.9 MiB (52.3 MiB/s) with 1 file(s) remaining Completed 9.8 MiB/20.9 MiB 
(53.4 MiB/s) with 1 file(s) remaining Completed 10.0 MiB/20.9 MiB (54.5 MiB/s) 
with 1 file(s) remainingCompleted 10.2 MiB/20.9 MiB (55.3 MiB/s) with 1 file(s) 
remainingCompleted 10.5 MiB/20.9 MiB (56.3 MiB/s) with 1 file(s) 
remainingCompleted 10.8 MiB/20.9 MiB (57.4 MiB/s) with 1 file(s) 
remainingCompleted 11.0 MiB/20.9 MiB (58.3 MiB/s) with 1 file(s) 
remainingCompleted 11.2 MiB/20.9 MiB (59.3 MiB/s) with 1 file(s) 
remainingCompleted 11.5 MiB/20.9 MiB (60.2 MiB/s) with 1 file(s) 
remainingCompleted 11.8 MiB/20.9 MiB (61.1 MiB/s) with 1 file(s) 
remainingCompleted 12.0 MiB/20.9 MiB (62.0 MiB/s) with 1 file(s) 
remainingCompleted 12.2 MiB/20.9 MiB (63.0 MiB/s) with 1 file(s) 
remainingCompleted 12.5 MiB/20.9 MiB (63.9 MiB/s) with 1 file(s) 
remainingCompleted 12.8 MiB/20.9 MiB (64.7 MiB/s) with 1 file(s) 
remainingCompleted 13.0 MiB/20.9 MiB (65.7 MiB/s) with 1 file(s) 
remainingCompleted 13.2 MiB/20.9 MiB (66.5 MiB/s) with 1 file(s) 
remainingCompleted 13.5 MiB/20.9 MiB (67.4 MiB/s) with 1 file(s) 
remainingCompleted 13.8 MiB/20.9 MiB (68.3 MiB/s) with 1 file(s) 
remainingCompleted 14.0 MiB/20.9 MiB (69.2 MiB/s) with 1 file(s) 
remainingCompleted 14.2 MiB/20.9 MiB (70.0 MiB/s) with 1 file(s) 
remainingCompleted 14.5 MiB/20.9 MiB (71.0 MiB/s) with 1 file(s) 
remainingCompleted 14.8 MiB/20.9 MiB (71.7 MiB/s) with 1 file(s) 
remainingCompleted 15.0 MiB/20.9 MiB (72.5 MiB/s) with 1 file(s) 
remainingCompleted 15.2 MiB/20.9 MiB (73.5 MiB/s) with 1 file(s) 
remainingCompleted 15.5 MiB/20.9 MiB (74.2 MiB/s) with 1 file(s) 
remainingCompleted 15.8 MiB/20.9 MiB (75.1 MiB/s) with 1 file(s) 
remainingCompleted 16.0 MiB/20.9 MiB (76.1 MiB/s) with 1 file(s) 
remainingCompleted 16.1 MiB/20.9 MiB (76.0 MiB/s) with 1 file(s) 
remainingCompleted 16.4 MiB/20.9 

[GitHub] [kafka] vvcephei commented on pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-02-04 Thread GitBox


vvcephei commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-773549603


   Thanks, all, that Scala fix looks perfect to me.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-02-04 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r570480399



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel,
 // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching 
listener was not found on the leader.
 // From version 6 onwards, we return LISTENER_NOT_FOUND to enable 
diagnosis of configuration errors.
 val errorUnavailableListeners = requestVersion >= 6
-val topicMetadata =
+val (topicMetadata, nonExistTopicMetadata) =
   if (authorizedTopics.isEmpty)
-Seq.empty[MetadataResponseTopic]
-  else {
-getTopicMetadata(
-  metadataRequest.allowAutoTopicCreation,
-  metadataRequest.isAllTopics,
-  authorizedTopics,
-  request.context.listenerName,
-  errorUnavailableEndpoints,
-  errorUnavailableListeners
-)
+(Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic])
+  else
+getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics,
+  request.context.listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners)
+
+nonExistTopicMetadata.foreach(metadata =>
+  try {
+// Validate topic name and propagate error if failed
+Topic.validate(metadata.name())

Review comment:
   Actually after looking into the zk admin manager logic, I don't think 
it's necessary to do the topic validation here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-04 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -483,61 +475,4 @@ class MetadataCacheTest {
 assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-val cache = new MetadataCache(1)
-val topic = "topic"
-val topicPartition = new TopicPartition(topic, 0)
-val securityProtocol = SecurityProtocol.PLAINTEXT
-val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-val brokers = Seq(
-  new UpdateMetadataBroker()
-.setId(0)
-.setRack("")
-.setEndpoints(Seq(new UpdateMetadataEndpoint()
-  .setHost("foo")
-  .setPort(9092)
-  .setSecurityProtocol(securityProtocol.id)
-  .setListener(listenerName.value)).asJava),
-  new UpdateMetadataBroker()
-.setId(1)
-.setEndpoints(Seq.empty.asJava)
-)
-val controllerEpoch = 1
-val leader = 1
-val leaderEpoch = 0
-val replicas = asList[Integer](0, 1)
-val isr = asList[Integer](0, 1)
-val offline = asList[Integer](1)
-val partitionStates = Seq(new UpdateMetadataPartitionState()
-  .setTopicName(topic)
-  .setPartitionIndex(topicPartition.partition)
-  .setControllerEpoch(controllerEpoch)
-  .setLeader(leader)
-  .setLeaderEpoch(leaderEpoch)
-  .setIsr(isr)
-  .setZkVersion(3)
-  .setReplicas(replicas)
-  .setOfflineReplicas(offline))
-val version = ApiKeys.UPDATE_METADATA.latestVersion
-val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-  brokers.asJava, Collections.emptyMap()).build()
-cache.updateMetadata(15, updateMetadataRequest)
-
-val expectedNode0 = new Node(0, "foo", 9092)
-val expectedNode1 = new Node(1, "", -1)
-
-val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
   Since we're looking up the cluster by listener name here, we don't see 
the offline broker in the MetadataImage because it's endpoints map is empty. 
This leads to `cluster.leaderFor` on L534 returning null
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have 
bad assumptions





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-04 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -483,61 +475,4 @@ class MetadataCacheTest {
 assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-val cache = new MetadataCache(1)
-val topic = "topic"
-val topicPartition = new TopicPartition(topic, 0)
-val securityProtocol = SecurityProtocol.PLAINTEXT
-val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-val brokers = Seq(
-  new UpdateMetadataBroker()
-.setId(0)
-.setRack("")
-.setEndpoints(Seq(new UpdateMetadataEndpoint()
-  .setHost("foo")
-  .setPort(9092)
-  .setSecurityProtocol(securityProtocol.id)
-  .setListener(listenerName.value)).asJava),
-  new UpdateMetadataBroker()
-.setId(1)
-.setEndpoints(Seq.empty.asJava)
-)
-val controllerEpoch = 1
-val leader = 1
-val leaderEpoch = 0
-val replicas = asList[Integer](0, 1)
-val isr = asList[Integer](0, 1)
-val offline = asList[Integer](1)
-val partitionStates = Seq(new UpdateMetadataPartitionState()
-  .setTopicName(topic)
-  .setPartitionIndex(topicPartition.partition)
-  .setControllerEpoch(controllerEpoch)
-  .setLeader(leader)
-  .setLeaderEpoch(leaderEpoch)
-  .setIsr(isr)
-  .setZkVersion(3)
-  .setReplicas(replicas)
-  .setOfflineReplicas(offline))
-val version = ApiKeys.UPDATE_METADATA.latestVersion
-val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-  brokers.asJava, Collections.emptyMap()).build()
-cache.updateMetadata(15, updateMetadataRequest)
-
-val expectedNode0 = new Node(0, "foo", 9092)
-val expectedNode1 = new Node(1, "", -1)
-
-val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
   Since we're looking up the cluster by listener name here, we don't see 
the offline broker in the MetadataImage because it's endpoints map is empty. 
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have 
bad assumptions





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #10053: KAFKA-10834: Remove redundant type casts in Connect

2021-02-04 Thread GitBox


kkonstantine commented on a change in pull request #10053:
URL: https://github.com/apache/kafka/pull/10053#discussion_r570464187



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -236,7 +236,7 @@ public DistributedHerder(DistributedConfig config,
 
 this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L,
 TimeUnit.MILLISECONDS,
-new LinkedBlockingDeque(1),
+new LinkedBlockingDeque<>(1),

Review comment:
   same nit as above

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##
@@ -63,7 +63,7 @@
 public SourceTaskOffsetCommitter(WorkerConfig config) {
 this(config, 
Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory(
 SourceTaskOffsetCommitter.class.getSimpleName() + "-%d", 
false)),
-new ConcurrentHashMap>());
+new ConcurrentHashMap<>());

Review comment:
   nit: format got unaligned. Please check the suggestion fixes it
   ```suggestion
   new ConcurrentHashMap<>());
   ```

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
##
@@ -269,8 +269,10 @@ public void testGetSetNull() throws Exception {
 final Capture> secondGetReadToEndCallback = 
EasyMock.newCapture();
 storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
 PowerMock.expectLastCall().andAnswer(() -> {
-capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) 
null, TP0_VALUE.array()));
-capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
TP1_KEY.array(), (byte[]) null));
+capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0,
+null, TP0_VALUE.array()));
+capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
TP1_KEY.array(),
+null));

Review comment:
   ```suggestion
   capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
TP1_KEY.array(), null));
   ```

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
##
@@ -269,8 +269,10 @@ public void testGetSetNull() throws Exception {
 final Capture> secondGetReadToEndCallback = 
EasyMock.newCapture();
 storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
 PowerMock.expectLastCall().andAnswer(() -> {
-capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (byte[]) 
null, TP0_VALUE.array()));
-capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
TP1_KEY.array(), (byte[]) null));
+capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0,
+null, TP0_VALUE.array()));

Review comment:
   ```suggestion
   capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, null, 
TP0_VALUE.array()));
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/connector/policy/AbstractConnectorClientConfigOverridePolicy.java
##
@@ -39,7 +39,8 @@ public void close() throws Exception {
 
 protected ConfigValue configValue(Map.Entry configEntry) {
 ConfigValue configValue =
-new ConfigValue(configEntry.getKey(), configEntry.getValue(), new 
ArrayList<>(), new ArrayList());
+new ConfigValue(configEntry.getKey(), configEntry.getValue(), new 
ArrayList<>(),
+new ArrayList<>());

Review comment:
   In this project width can be longer. For old lines let's keep it like 
that. New is fine to format based on a 100 char width (I believe) but again not 
required currently. 
   ```suggestion
   new ConfigValue(configEntry.getKey(), configEntry.getValue(), 
new ArrayList<>(), new ArrayList<>());
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##
@@ -390,12 +390,14 @@ private void checkAndPutConnectorConfigName(String 
connectorName, Map T completeOrForwardRequest(FutureCallback cb, String path, 
String method, HttpHeaders headers, Object body,

[GitHub] [kafka] cmccabe merged pull request #10019: MINOR: Introduce KafkaBroker trait for use in dynamic reconfiguration

2021-02-04 Thread GitBox


cmccabe merged pull request #10019:
URL: https://github.com/apache/kafka/pull/10019


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-04 Thread GitBox


jsancio commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r570367570



##
File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+
+public final class KafkaEventQueue implements EventQueue {

Review comment:
   @cmccabe What do you think about splitting this functionality into two 
types? For example:
   
   1. `EventQueue` is a type which is responsible for ordering events given the 
the insertion type and deadline. This type is thread-safe but doesn't 
instantiate thread(s). This type exposes methods for enqueuing and dequeuing 
events. The dequeuing method(s) can take in a "time" parameter and polls to see 
if there is an event ready. The dequeue method(s) would need to return the 
difference between "time" and the next closest event in the queue.
   2. `SingleThreadEventExecutor` is a type which spawns a thread to dequeue 
events from the `EventQueue`, executes the `run` or `handleException` methods 
of the event and it is `AutoCloseable`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-04 Thread GitBox


jsancio commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r570462338



##
File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+
+public final class KafkaEventQueue implements EventQueue {

Review comment:
   Okay. I suggested it because maybe unittests would be easier to write 
since the tests would have to deal with concurrency.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-04 Thread GitBox


junrao commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r570453181



##
File path: metadata/src/main/java/org/apache/kafka/queue/EventQueue.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import org.slf4j.Logger;
+
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+
+public interface EventQueue extends AutoCloseable {
+interface Event {
+/**
+ * Run the event.
+ */
+void run() throws Exception;
+
+/**
+ * Handle an exception that was either generated by running the event, 
or by the
+ * event queue's inability to run the event.
+ *
+ * @param e The exception.  This will be a TimeoutException if the 
event hit
+ *  its deadline before it could be scheduled.
+ *  It will be a RejectedExecutionException if the event 
could not be
+ *  scheduled because the event queue has already been 
closed.
+ *  Otherweise, it will be whatever exception was thrown 
by run().
+ */
+default void handleException(Throwable e) {}
+}
+
+abstract class FailureLoggingEvent implements Event {
+private final Logger log;
+
+public FailureLoggingEvent(Logger log) {
+this.log = log;
+}
+
+@Override
+public void handleException(Throwable e) {
+if (e instanceof RejectedExecutionException) {
+log.info("Not processing {} because the event queue is 
closed.",
+this.toString());
+} else {
+log.error("Unexpected error handling {}", this.toString(), e);
+}
+}
+
+@Override
+public String toString() {
+return this.getClass().getSimpleName();
+}
+}
+
+class NoDeadlineFunction implements Function {
+public static final NoDeadlineFunction INSTANCE = new 
NoDeadlineFunction();
+
+@Override
+public OptionalLong apply(OptionalLong ignored) {
+return OptionalLong.empty();
+}
+}
+
+class DeadlineFunction implements Function {
+private final long deadlineNs;
+
+public DeadlineFunction(long deadlineNs) {
+this.deadlineNs = deadlineNs;
+}
+
+@Override
+public OptionalLong apply(OptionalLong ignored) {
+return OptionalLong.of(deadlineNs);
+}
+}
+
+class EarliestDeadlineFunction implements Function {
+private final long newDeadlineNs;
+
+public EarliestDeadlineFunction(long newDeadlineNs) {
+this.newDeadlineNs = newDeadlineNs;
+}
+
+@Override
+public OptionalLong apply(OptionalLong prevDeadlineNs) {
+if (!prevDeadlineNs.isPresent()) {
+return OptionalLong.of(newDeadlineNs);
+} else if (prevDeadlineNs.getAsLong() < newDeadlineNs) {
+return prevDeadlineNs;
+} else {
+return OptionalLong.of(newDeadlineNs);
+}
+}
+}
+
+class VoidEvent implements Event {
+public final static VoidEvent INSTANCE = new VoidEvent();
+
+@Override
+public void run() throws Exception {
+}
+}
+
+/**
+ * Add an element to the front of the queue.
+ *
+ * @param event The mandatory event to prepend.
+ */
+default void prepend(Event event) {
+enqueue(EventInsertionType.PREPEND, null, NoDeadlineFunction.INSTANCE, 
event);
+}
+
+/**
+ * Add an element to the end of the queue.
+ *
+ * @param event The event to append.
+ */
+default void append(Event event) {
+enqueue(EventInsertionType.APPEND, null, NoDeadlineFunction.INSTANCE, 
event);
+}
+
+/**
+ * Add an event to the end of the queue.
+ *
+ * @param deadlineNsThe deadline for starting the event, in 
monotonic
+ *  

[GitHub] [kafka] cmccabe commented on pull request #10047: MINOR: Add ClusterTool as specified in KIP-631

2021-02-04 Thread GitBox


cmccabe commented on pull request #10047:
URL: https://github.com/apache/kafka/pull/10047#issuecomment-773516371


   Test failures are not related to the PR



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-04 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570451366



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -483,61 +475,4 @@ class MetadataCacheTest {
 assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {

Review comment:
   Hmm, I think this was an artifact of the merge, I'll restore this test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-04 Thread GitBox


cmccabe commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r570450458



##
File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+
+public final class KafkaEventQueue implements EventQueue {
+/**
+ * A context object that wraps events.
+ */
+private static class EventContext {
+/**
+ * The caller-supplied event.
+ */
+private final Event event;
+
+/**
+ * How this event was inserted.
+ */
+private final EventInsertionType insertionType;
+
+/**
+ * The previous pointer of our circular doubly-linked list.
+ */
+private EventContext prev = this;
+
+/**
+ * The next pointer in our circular doubly-linked list.
+ */
+private EventContext next = this;
+
+/**
+ * If this event is in the delay map, this is the key it is there 
under.
+ * If it is not in the map, this is null.
+ */
+private Long deadlineNs = null;
+
+/**
+ * The tag associated with this event.
+ */
+private String tag;
+
+EventContext(Event event, EventInsertionType insertionType, String 
tag) {
+this.event = event;
+this.insertionType = insertionType;
+this.tag = tag;
+}
+
+/**
+ * Insert a new node in the circularly linked list after this node.
+ */
+void insertAfter(EventContext other) {
+this.next.prev = other;
+other.next = this.next;
+other.prev = this;
+this.next = other;
+}
+
+/**
+ * Insert a new node in the circularly linked list before this node.
+ */
+void insertBefore(EventContext other) {
+this.prev.next = other;
+other.prev = this.prev;
+other.next = this;
+this.prev = other;
+}
+
+/**
+ * Remove this node from the circularly linked list.
+ */
+void remove() {
+this.prev.next = this.next;
+this.next.prev = this.prev;
+this.prev = this;
+this.next = this;
+}
+
+/**
+ * Returns true if this node is the only element in its list.
+ */
+boolean isSingleton() {
+return prev == this && next == this;
+}
+
+/**
+ * Run the event associated with this EventContext.
+ */
+void run() throws InterruptedException {
+try {
+event.run();
+} catch (InterruptedException e) {
+throw e;
+} catch (Exception e) {
+event.handleException(e);
+}
+}
+
+/**
+ * Complete the event associated with this EventContext with a timeout 
exception.
+ */
+void completeWithTimeout() {
+completeWithException(new TimeoutException());
+}
+
+/**
+ * Complete the event associated with this EventContext with the 
specified
+ * exception.
+ */
+void completeWithException(Throwable t) {
+event.handleException(t);
+}
+}
+
+private class EventHandler implements Runnable {
+/**
+ * Event contexts indexed by tag.  Events without a tag are not 
included here.
+ */
+private final Map tagToEventContext = new 
HashMap<>();
+
+/**
+ * The head of the event queue.
+   

[GitHub] [kafka] cmccabe commented on a change in pull request #10030: MINOR: Add KafkaEventQueue

2021-02-04 Thread GitBox


cmccabe commented on a change in pull request #10030:
URL: https://github.com/apache/kafka/pull/10030#discussion_r570442555



##
File path: metadata/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
##
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+
+public final class KafkaEventQueue implements EventQueue {

Review comment:
   That's an interesting idea, but I'm not sure I see an advantage for this 
use-case.  We only want a single thread here-- otherwise we would have to have 
locking in the controller and in the parts of the broker which use this queue.  
So the potential benefit that I can see from your proposal (allowing multiple 
executors) doesn't really apply here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2021-02-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-7271.
--
Resolution: Duplicate

> Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
> ---
>
> Key: KAFKA-7271
> URL: https://issues.apache.org/jira/browse/KAFKA-7271
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >