[GitHub] [kafka] chia7712 commented on a change in pull request #9663: MINOR: Small cleanups in `AlterIsr` handling logic

2020-11-30 Thread GitBox


chia7712 commented on a change in pull request #9663:
URL: https://github.com/apache/kafka/pull/9663#discussion_r533126745



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1372,23 +1390,27 @@ class Partition(val topicPartition: TopicPartition,
* Since our error was non-retryable we are okay staying in this state until 
we see new metadata from UpdateMetadata
* or LeaderAndIsr
*/
-  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+  private def handleAlterIsrResponse(proposedIsrState: IsrState, result: 
Either[Errors, LeaderAndIsr]): Unit = {

Review comment:
   Could we rewrite it by currying?
   
   *Before*
   ```scala
   val callbackPartial = handleAlterIsrResponse(proposedIsrState, _ : 
Either[Errors, LeaderAndIsr])
   
   if (!alterIsrManager.enqueue(AlterIsrItem(topicPartition, 
newLeaderAndIsr, callbackPartial))) {
 throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request 
with state " +
   s"$newLeaderAndIsr for partition $topicPartition")
   }
   ```
   
   *After*
   ```scala
   
   if (!alterIsrManager.enqueue(AlterIsrItem(topicPartition, 
newLeaderAndIsr, handleAlterIsrResponse(proposedIsrState {
 throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request 
with state " +
   s"$newLeaderAndIsr for partition $topicPartition")
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #8404: KAFKA-10787: Introduce an import order in Java sources

2020-11-30 Thread GitBox


dongjinleekr commented on pull request #8404:
URL: https://github.com/apache/kafka/pull/8404#issuecomment-736272990


   Here is the update. 
   
   - Adapted the three-group import ordering as discussed in [the mailing 
thread](https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E).
   - Add documentation for the IDE plugins.
   - Reorder the existing files and rebased onto the latest trunk.
   
   cc/ @mjsax @cadonna @ableegoldman @vvcephei



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap

2020-11-30 Thread GitBox


viktorsomogyi edited a comment on pull request #9519:
URL: https://github.com/apache/kafka/pull/9519#issuecomment-736027069


   @lbradstreet it is really hard to give an exact answer to this as collision 
rate is hard to calculate mathematically as it is very dependant on the size 
and values of the testset. For non-cryptographic hashes it is possible to 
generate DDoS attacks where everything gets placed into the same bucket and 
thus slows down lookups. On the theoretical side though Murmur3 passes the most 
often cited Chi Square test, it has a very good avalanche effect and thus 
generates a hashes that are very close to the uniform distribution.
   Because of the lack of available mathematical articles on this topic (murmur 
vs MD5) I started brute-force tests where I generated a few billion unique keys 
and inserted them into a Bloom Filter (which had a 1% false positive 
probability). That showed that Murmur3 is actually on the same level as MD5, it 
generates roughly the same amount of collisions. I have two types of datasets: 
the first can use any UTF8 characters and the second works only from the 
printable ASCII characters. In fact both MD5, murmur3 128bit, murmur3 64bit and 
xxhash64 bit generated around the same amount of collisions which was 0.016% 
out of 200 million unique keys. I added Murmur3 32bit for a baseline but it was 
significantly worse, around 2% of collisions. Maybe to show the difference we 
need a much larger keyset, I'll try to do what I can.
   I'll publish my code in the following days I just have to work on something 
else too so it's a bit slow, sorry :).
   
   On the other hand if we want to make sure that there will be no collisions, 
I don't think it's possible with either of these solutions, there is always a 
chance. To completely cut this off we either have to store the user key-hash 
maps similarly to the offset indexes and reject new, colliding keys or use 
perfect hashes (but that couldn't work well as it requires the knowledge of the 
full keyset or have to rebuild the cache in each insert or at least often).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10629) TopologyTestDriver should not require a Properties arg

2020-11-30 Thread Rohit Deshpande (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241301#comment-17241301
 ] 

Rohit Deshpande commented on KAFKA-10629:
-

thanks [~mjsax] 

> TopologyTestDriver should not require a Properties arg
> --
>
> Key: KAFKA-10629
> URL: https://issues.apache.org/jira/browse/KAFKA-10629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, streams-test-utils
>Reporter: John Roesler
>Assignee: Rohit Deshpande
>Priority: Minor
>  Labels: kip, newbie
>
> As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver 
> usages will have no configurations at all to specify, so we should provide a 
> constructor that doesn't take a Properties argument. Right now, such 
> configuration-free usages have to provide an empty Properties object.
> KIP-680: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #9664: KAFKA-10780; Rewrite ControllerZNode struct with auto-generated protocol

2020-11-30 Thread GitBox


dengziming commented on pull request #9664:
URL: https://github.com/apache/kafka/pull/9664#issuecomment-736255004


   @ijuma Thank you, I ignored this fact. 
   Some of the ZkNode will be removed because we no longer need them after 
KIP-500, for example `IsrChangeNotificationZNode` and `ControllerZNode`, 
however Some ZkNode will just be moved from ZkNode to RaftKafkaMetadataLog , 
for example `TopicZNode` and `FeatureZNode`.
   So can we still rewrite their struct use auto-generated protocol which can 
also be used after KIP-500? also ping @abbccdda as this is a major direction 
discussion.
   I will close this pr and create another for `TopicZNode` if you think it's 
reasonable, Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9660: Kafka 10629 - TopologyTestDriver should not require a Properties argument

2020-11-30 Thread GitBox


rohitrmd commented on a change in pull request #9660:
URL: https://github.com/apache/kafka/pull/9660#discussion_r533099003



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -254,6 +265,19 @@ public TopologyTestDriver(final Topology topology,
 this(topology, config, null);
 }
 
+/**
+ * Create a new test diver instance.
+ * Initialized the internally mocked wall-clock time with {@link 
System#currentTimeMillis() current system time}.

Review comment:
   fixed the comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10787) Introduce an import order in Java sources

2020-11-30 Thread Dongjin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee updated KAFKA-10787:

Affects Version/s: 2.8.0

> Introduce an import order in Java sources
> -
>
> Key: KAFKA-10787
> URL: https://issues.apache.org/jira/browse/KAFKA-10787
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>
> As of present, Kafka uses a relatively strict code style for Java code, 
> except import order. For this reason, the code formatting settings of every 
> local dev environment are different from person to person, resulting in 
> countless meaningless import order changes in the PR.
> This issue aims to define and apply a 3-group import order, like the 
> following:
> 1. Project packages: {{kafka.*}}, {{org.apache.kafka.*}} 
> 2. Third Party packages: {{com.*}}, {{net.*}}, {{org.*}}
> 3. Java packages: {{java.*}}, {{javax.*}}
> Discussion Thread: 
> https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10787) Introduce an import order in Java sources

2020-11-30 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241272#comment-17241272
 ] 

Dongjin Lee commented on KAFKA-10787:
-

PR: https://github.com/apache/kafka/pull/8404

> Introduce an import order in Java sources
> -
>
> Key: KAFKA-10787
> URL: https://issues.apache.org/jira/browse/KAFKA-10787
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>
> As of present, Kafka uses a relatively strict code style for Java code, 
> except import order. For this reason, the code formatting settings of every 
> local dev environment are different from person to person, resulting in 
> countless meaningless import order changes in the PR.
> This issue aims to define and apply a 3-group import order, like the 
> following:
> 1. Project packages: {{kafka.*}}, {{org.apache.kafka.*}} 
> 2. Third Party packages: {{com.*}}, {{net.*}}, {{org.*}}
> 3. Java packages: {{java.*}}, {{javax.*}}
> Discussion Thread: 
> https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-30 Thread GitBox


chia7712 commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r533093802



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -1271,27 +1270,48 @@ public void testProducerJmxPrefix() throws  Exception {
 producer.close();
 }
 
-private ProducerMetadata newMetadata(long refreshBackoffMs, long 
expirationMs) {
-return new ProducerMetadata(refreshBackoffMs, expirationMs, 
defaultMetadataIdleMs,
+private static ProducerMetadata newMetadata(long refreshBackoffMs, long 
expirationMs) {
+return new ProducerMetadata(refreshBackoffMs, expirationMs, 
DEFAULT_METADATA_IDLE_MS,
 new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
 }
 
 @Test
-public void serializerShouldSeeGeneratedClientId() {
+public void configurableObjectsShouldSeeGeneratedClientId() {
 Properties props = new Properties();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:");
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
SerializerForClientId.class.getName());
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
SerializerForClientId.class.getName());
+props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
PartitionerForClientId.class.getName());
+props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
ProducerInterceptorForClientId.class.getName());
 
 KafkaProducer producer = new KafkaProducer<>(props);
-assertEquals(2, SerializerForClientId.CLIENT_IDS.size());
-assertEquals(SerializerForClientId.CLIENT_IDS.get(0), 
producer.getClientId());
-assertEquals(SerializerForClientId.CLIENT_IDS.get(1), 
producer.getClientId());
+assertNotNull(producer.getClientId());
+assertNotEquals(0, producer.getClientId().length());
+assertEquals(4, CLIENT_IDS.size());
+CLIENT_IDS.forEach(id -> assertEquals(id, producer.getClientId()));
 producer.close();
 }
 
+@Test
+public void testUnusedConfigs() {
+Map props = new HashMap<>();
+props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
+ProducerConfig config = new 
ProducerConfig(ProducerConfig.appendSerializerToConfig(props,
+new StringSerializer(), new StringSerializer()));
+
+assertTrue(new ProducerConfig(config.originals(), 
false).unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));

Review comment:
   I'm going to remove this test ```assertTrue(new 
ProducerConfig(config.originals(), false).unused().co...```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-30 Thread GitBox


chia7712 commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r533093616



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map 
originals,  Map
 throw new ConfigException(entry.getKey().toString(), 
entry.getValue(), "Key must be a string.");
 
 this.originals = resolveConfigVariables(configProviderProps, 
(Map) originals);
-this.values = definition.parse(this.originals);
+// pass a copy to definition.parse. Otherwise, the definition.parse 
adds all keys of definitions to "used" group
+// since definition.parse needs to call "RecordingMap#get" when 
checking all definitions.
+this.values = definition.parse(new HashMap<>(this.originals));

Review comment:
   > But it seems this is still needed? 
   
   It is not necessary with the latest change. I kept it as a total solution 
(if someone pass ```RecordingMap``` in the future). However, I'm going to 
remove it to make this PR simpler.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10787) Introduce an import order in Java sources

2020-11-30 Thread Dongjin Lee (Jira)
Dongjin Lee created KAFKA-10787:
---

 Summary: Introduce an import order in Java sources
 Key: KAFKA-10787
 URL: https://issues.apache.org/jira/browse/KAFKA-10787
 Project: Kafka
  Issue Type: Improvement
Reporter: Dongjin Lee
Assignee: Dongjin Lee


As of present, Kafka uses a relatively strict code style for Java code, except 
import order. For this reason, the code formatting settings of every local dev 
environment are different from person to person, resulting in countless 
meaningless import order changes in the PR.

This issue aims to define and apply a 3-group import order, like the following:

1. Project packages: {{kafka.*}}, {{org.apache.kafka.*}} 
2. Third Party packages: {{com.*}}, {{net.*}}, {{org.*}}
3. Java packages: {{java.*}}, {{javax.*}}

Discussion Thread: 
https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-30 Thread GitBox


chia7712 commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r533092986



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -314,27 +315,23 @@ public KafkaProducer(Properties properties) {
  * be called in the producer when the serializer 
is passed in directly.
  */
 public KafkaProducer(Properties properties, Serializer keySerializer, 
Serializer valueSerializer) {
-this(Utils.propsToMap(properties), keySerializer, valueSerializer, 
null, null, null,
-Time.SYSTEM);
+this(Utils.propsToMap(properties), keySerializer, valueSerializer);
 }
 
 // visible for testing
 @SuppressWarnings("unchecked")
-KafkaProducer(Map configs,
+KafkaProducer(ProducerConfig config,
   Serializer keySerializer,
   Serializer valueSerializer,
   ProducerMetadata metadata,
   KafkaClient kafkaClient,
   ProducerInterceptors interceptors,
   Time time) {
-ProducerConfig config = new 
ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer,
-valueSerializer));
 try {
-Map userProvidedConfigs = config.originals();
 this.producerConfig = config;
 this.time = time;
 
-String transactionalId = (String) 
userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+String transactionalId = (String) 
config.originals().get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);

Review comment:
   good point. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-30 Thread GitBox


chia7712 commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r533092957



##
File path: 
clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
##
@@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() {
 
 // test configs with listener prefix
 Map configs = 
ChannelBuilders.channelBuilderConfigs(securityConfig, new 
ListenerName("listener1"));
+
 
assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
 assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), 
"testkafka");
+
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
+
 assertEquals(configs.get("sasl.kerberos.service.name"), 
"testkafkaglobal");
+
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));

Review comment:
   you are right.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

2020-11-30 Thread GitBox


abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533091766



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##
@@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: 
kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-   callback: RequestCompletionHandler): Unit = {
-requestQueue.put(BrokerToControllerQueueItem(request, callback))
+   callback: 
BrokerToControllerRequestCompletionHandler,
+   requestTimeout: Long): Unit = {
+requestQueue.put(BrokerToControllerQueueItem(request, callback, 
time.milliseconds() + requestTimeout))

Review comment:
   Good catch!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

2020-11-30 Thread GitBox


abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533091229



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
 val message = buildRequest(inflightAlterIsrItems)
-def responseHandler(response: ClientResponse): Unit = {
-  try {
-val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
-  } finally {
-// Be sure to clear the in-flight flag to allow future AlterIsr 
requests
-if (!inflightRequest.compareAndSet(true, false)) {
-  throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+
+def clearInflightRequests(): Unit = {
+  // Be sure to clear the in-flight flag to allow future AlterIsr requests
+  if (!inflightRequest.compareAndSet(true, false)) {
+throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+  }
+}
+
+class AlterIsrResponseHandler extends 
BrokerToControllerRequestCompletionHandler {
+  override def onComplete(response: ClientResponse): Unit = {
+try {
+  val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+  handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
+} finally {
+  clearInflightRequests()
 }
   }
+
+  override def onTimeout(): Unit = {
+warn(s"Encountered request when sending AlterIsr to the controller")

Review comment:
   Not sure we could do the callback here, since the request failed 
already. Maybe just do nothing here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9664: KAFKA-10780; Rewrite ControllerZNode struct with auto-generated protocol

2020-11-30 Thread GitBox


ijuma commented on pull request #9664:
URL: https://github.com/apache/kafka/pull/9664#issuecomment-736240488


   Thanks for the PR. I don't think we should do this since KIP-500 will 
replace all this code.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-30 Thread GitBox


chia7712 commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r533079695



##
File path: 
clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
##
@@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() {
 
 // test configs with listener prefix
 Map configs = 
ChannelBuilders.channelBuilderConfigs(securityConfig, new 
ListenerName("listener1"));
+
 
assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
 assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), 
"testkafka");
+
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
+
 assertEquals(configs.get("sasl.kerberos.service.name"), 
"testkafkaglobal");
+
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
+
 
assertNull(configs.get("listener.name.listener1.sasl.kerberos.service.name"));
+
assertFalse(securityConfig.unused().contains("listener.name.listener1.sasl.kerberos.service.name"));
 
 assertNull(configs.get("plain.sasl.server.callback.handler.class"));
+
assertFalse(securityConfig.unused().contains("plain.sasl.server.callback.handler.class"));
+
 
assertEquals(configs.get("listener.name.listener1.gssapi.config1.key"), 
"custom.config1");
+
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.config1.key"));
+
 assertEquals(configs.get("custom.config2.key"), "custom.config2");
+assertFalse(securityConfig.unused().contains("custom.config2.key"));
 
 // test configs without listener prefix
+securityConfig = new TestSecurityConfig(props);

Review comment:
   We need a new ```RecordingMap``` to test different key without listener 
prefix. Otherwise, the key may be used by previous test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9423: KAFKA-9263 The new hw is added to incorrect log when ReplicaAlterLogD…

2020-11-30 Thread GitBox


chia7712 commented on pull request #9423:
URL: https://github.com/apache/kafka/pull/9423#issuecomment-736222765


   > Are test failures related to this PR?
   
   They are unrelated error. Will rebase PR to trigger QA again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9659: KAFKA-10770: Remove duplicate defination of Metrics#getTags

2020-11-30 Thread GitBox


chia7712 merged pull request #9659:
URL: https://github.com/apache/kafka/pull/9659


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9664: KAFKA-10780; Rewrite ControllerZNode struct with auto-generated protocol

2020-11-30 Thread GitBox


dengziming commented on pull request #9664:
URL: https://github.com/apache/kafka/pull/9664#issuecomment-736217128


   Hi, @abbccdda PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10786) ReplicaAlterLogDirsThread gets stuck during the reassignment of Kafka partition

2020-11-30 Thread nick song (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nick song updated KAFKA-10786:
--
Priority: Blocker  (was: Major)

>  ReplicaAlterLogDirsThread gets stuck during the reassignment of Kafka 
> partition
> 
>
> Key: KAFKA-10786
> URL: https://issues.apache.org/jira/browse/KAFKA-10786
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.0.0
>Reporter: nick song
>Priority: Blocker
> Attachments: attachment 1.png, attachment 2.png, attachment 3.png
>
>
> Topic config:Configs for topic 'athena_8603' are 
> leader.replication.throttled.replicas=9:7,9:6,10:8,10:7,8:6,8:5,11:9,11:8,follower.replication.throttled.replicas=9:13,10:0,8:15,11:14,retention.ms=8640,delete.retention.ms=6
>  
> Reassignment of replica athena_8603-1-15 is still in progress
>  
> When I reassigning the topic partition, I found that some tasks have been in 
> progress, lasting more than ten hours. After investigation, it was found that 
> ReplicaAlterLogDirsThread was running all the time and occupies a high CPU 
> usage rate (Attachment 1).
> Check the thread information (Attachment 2) and find that the log data is 
> being copied. Check the log directory (Attachment 3) and find that the index 
> of the future directory is older than the original log. Is it because the 
> configuration delete.retention.ms=6 caused the data to be deleted while 
> copying ? This causes the replication thread to get stuck. Is there any 
> solution?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10786) ReplicaAlterLogDirsThread gets stuck during the reassignment of Kafka partition

2020-11-30 Thread nick song (Jira)
nick song created KAFKA-10786:
-

 Summary:  ReplicaAlterLogDirsThread gets stuck during the 
reassignment of Kafka partition
 Key: KAFKA-10786
 URL: https://issues.apache.org/jira/browse/KAFKA-10786
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 2.0.0
Reporter: nick song
 Attachments: attachment 1.png, attachment 2.png, attachment 3.png

Topic config:Configs for topic 'athena_8603' are 
leader.replication.throttled.replicas=9:7,9:6,10:8,10:7,8:6,8:5,11:9,11:8,follower.replication.throttled.replicas=9:13,10:0,8:15,11:14,retention.ms=8640,delete.retention.ms=6

 

Reassignment of replica athena_8603-1-15 is still in progress

 

When I reassigning the topic partition, I found that some tasks have been in 
progress, lasting more than ten hours. After investigation, it was found that 
ReplicaAlterLogDirsThread was running all the time and occupies a high CPU 
usage rate (Attachment 1).
Check the thread information (Attachment 2) and find that the log data is being 
copied. Check the log directory (Attachment 3) and find that the index of the 
future directory is older than the original log. Is it because the 
configuration delete.retention.ms=6 caused the data to be deleted while 
copying ? This causes the replication thread to get stuck. Is there any 
solution?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

2020-11-30 Thread GitBox


hachikuji commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r533034552



##
File path: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
##
@@ -84,19 +85,24 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
   }
 
   // Custom header serialization so that protocol assumptions are not forced
-  private def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: 
String = "", correlationId: Int = -1): Array[Byte] = {
+  def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = 
"", correlationId: Int = -1): Array[Byte] = {
+// Check for flex versions, some tests here verify that an invalid apiKey 
is detected properly, so if -1 is used,
+// assume the request is not using flex versions.
+val flexVersion = if (apiKey >= 0) 
ApiKeys.forId(apiKey).requestHeaderVersion(apiVersion) >= 2 else false
 val size = {
   2 /* apiKey */ +
 2 /* version id */ +
 4 /* correlation id */ +
-Type.NULLABLE_STRING.sizeOf(clientId) /* client id */
+Type.NULLABLE_STRING.sizeOf(clientId)  /* client id */ +
+(if (flexVersion) ByteUtils.sizeOfUnsignedVarint(0) else 0)

Review comment:
   nit: maybe add a comment that this field is for the number of tagged 
fields?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #9664: KAFKA-10780; Rewrite ControllerZNode struct with auto-generated protocol

2020-11-30 Thread GitBox


dengziming opened a new pull request #9664:
URL: https://github.com/apache/kafka/pull/9664


   *More detailed description of your change*
   The #9662 rewrite FeatureZNode struct with auto-generated protocol, but it's 
a non-trivial change, so we can just review this simple pr first.
   
   *Summary of testing strategy (including rationale)*
   unit test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

2020-11-30 Thread GitBox


hachikuji commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r533033927



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
##
@@ -105,8 +105,8 @@ public int hashCode() {
 
 public final WriteTxnMarkersRequestData data;
 
-public Builder(final List markers) {
-super(ApiKeys.WRITE_TXN_MARKERS);
+public Builder(final List markers, short 
latestAllowedVersion) {
+super(ApiKeys.WRITE_TXN_MARKERS, 
ApiKeys.WRITE_TXN_MARKERS.oldestVersion(), latestAllowedVersion);

Review comment:
   I think this is probably ok, but it is a little inconsistent with how we 
handle the versions for other inter-broker RPCs. Since we rely on the IBP, we 
always set the version explicitly in the caller, which means there is exactly 
one allowable version for the builder to use. See for example 
`LeaderAndIsrRequest.Builder`.

##
File path: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
##
@@ -84,19 +85,24 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
   }
 
   // Custom header serialization so that protocol assumptions are not forced
-  private def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: 
String = "", correlationId: Int = -1): Array[Byte] = {
+  def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = 
"", correlationId: Int = -1): Array[Byte] = {
+// Check for flex versions, some tests here verify that an invalid apiKey 
is detected properly, so if -1 is used,
+// assume the request is not using flex versions.
+val flexVersion = if (apiKey >= 0) 
ApiKeys.forId(apiKey).requestHeaderVersion(apiVersion) >= 2 else false
 val size = {
   2 /* apiKey */ +
 2 /* version id */ +
 4 /* correlation id */ +
-Type.NULLABLE_STRING.sizeOf(clientId) /* client id */
+Type.NULLABLE_STRING.sizeOf(clientId)  /* client id */ +
+(if (flexVersion) ByteUtils.sizeOfUnsignedVarint(0) else 0)

Review comment:
   nit: maybe add a comment that this is field is for the number of tagged 
fields?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -286,6 +286,7 @@ public static byte requiredMagicForVersion(short 
produceRequestVersion) {
 case 6:
 case 7:
 case 8:
+case 9:

Review comment:
   I wonder if we may as well make this the default case. Not sure we're 
getting much by forcing ourselves to update this logic after each bump. Maybe 
the range validation is still useful, but that could be done by using 
`oldestVersion` and `latestVersion`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-30 Thread GitBox


mjsax commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533026232



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -398,6 +407,7 @@ public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler st
 final Consumer handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
 synchronized (stateLock) {
 if (state == State.CREATED) {
+this.streamsUncaughtExceptionHandler = handler;

Review comment:
   nit. remove unnecessary `this.`

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 time,
 globalThreadId,
 delegatingStateRestoreListener,
-this::defaultStreamsUncaughtExceptionHandler
+streamsUncaughtExceptionHandler
 );
 globalThreadState = globalStreamThread.state();
 }
 
 // use client id instead of thread client id since this admin client 
may be shared among threads
 adminClient = 
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-final Map threadState = new 
HashMap<>(numStreamThreads);
-final ArrayList storeProviders = new 
ArrayList<>();
+threadState = new HashMap<>(numStreamThreads);
+storeProviders = new ArrayList<>();
 for (int i = 0; i < numStreamThreads; i++) {
-final StreamThread streamThread = StreamThread.create(
-internalTopologyBuilder,
-config,
-clientSupplier,
-adminClient,
-processId,
-clientId,
-streamsMetrics,
-time,
-streamsMetadataState,
-cacheSizePerThread,
-stateDirectory,
-delegatingStateRestoreListener,
-i + 1,
-KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
+createStreamThread(cacheSizePerThread, i + 1);

Review comment:
   Nit: can we change the loop to `int = 1; i <= numStreamThreads` and just 
pass in `i` here?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -846,43 +856,24 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 time,
 globalThreadId,
 delegatingStateRestoreListener,
-this::defaultStreamsUncaughtExceptionHandler
+streamsUncaughtExceptionHandler
 );
 globalThreadState = globalStreamThread.state();
 }
 
 // use client id instead of thread client id since this admin client 
may be shared among threads
 adminClient = 
clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId)));
 
-final Map threadState = new 
HashMap<>(numStreamThreads);
-final ArrayList storeProviders = new 
ArrayList<>();
+threadState = new HashMap<>(numStreamThreads);
+storeProviders = new ArrayList<>();
 for (int i = 0; i < numStreamThreads; i++) {
-final StreamThread streamThread = StreamThread.create(
-internalTopologyBuilder,
-config,
-clientSupplier,
-adminClient,
-processId,
-clientId,
-streamsMetrics,
-time,
-streamsMetadataState,
-cacheSizePerThread,
-stateDirectory,
-delegatingStateRestoreListener,
-i + 1,
-KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
+createStreamThread(cacheSizePerThread, i + 1);
 }
 
 ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
 Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
 
-final StreamStateListener streamStateListener = new 
StreamStateListener(threadState, globalThreadState);
+streamStateListener = new StreamStateListener(threadState, 
globalThreadState);

Review comment:
   Can we 

[GitHub] [kafka] hachikuji commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

2020-11-30 Thread GitBox


hachikuji commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533024276



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##
@@ -35,7 +35,8 @@ import scala.jdk.CollectionConverters._
 
 trait BrokerToControllerChannelManager {
   def sendRequest(request: AbstractRequest.Builder[_ <: AbstractRequest],
-  callback: RequestCompletionHandler): Unit
+  callback: BrokerToControllerRequestCompletionHandler,
+  requestTimeout: Long): Unit

Review comment:
   Perhaps we could use a name like `retryTimeout` to distinguish this from 
the request timeout which only applies to individual requests. Alternatively we 
could let the caller provide the retry deadline explicitly. This would save the 
need for the extra `time.milliseconds` call.

##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##
@@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: 
KafkaClient,
 }
   }
 
+  private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = {

Review comment:
   nit: `hasTimedOut`?

##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##
@@ -125,15 +126,25 @@ class BrokerToControllerChannelManagerImpl(metadataCache: 
kafka.server.MetadataC
   }
 
   override def sendRequest(request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-   callback: RequestCompletionHandler): Unit = {
-requestQueue.put(BrokerToControllerQueueItem(request, callback))
+   callback: 
BrokerToControllerRequestCompletionHandler,
+   requestTimeout: Long): Unit = {
+requestQueue.put(BrokerToControllerQueueItem(request, callback, 
time.milliseconds() + requestTimeout))

Review comment:
   Won't this overflow with `requestTimeout` set to `Long.MaxValue`. Do we 
have any test cases?

##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##
@@ -178,6 +191,10 @@ class BrokerToControllerRequestThread(networkClient: 
KafkaClient,
 }
   }
 
+  private def isTimedOut(request: BrokerToControllerQueueItem): Boolean = {
+time.milliseconds() > request.deadlineMs

Review comment:
   Maybe we can avoid this call to `time.milliseconds` and use 
`ClientResponse.receivedTimeMs`?

##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##
@@ -165,7 +176,9 @@ class BrokerToControllerRequestThread(networkClient: 
KafkaClient,
   }
 
   private[server] def handleResponse(request: 
BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-if (response.wasDisconnected()) {
+if (isTimedOut(request)) {

Review comment:
   We check for timeouts only after receiving a response. I guess this 
means that in the worst case, the total timeout would be request.timeout*2. 
This is probably not a big deal, but maybe worth documenting in a comment 
somewhere. 

##
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##
@@ -44,26 +46,34 @@ class ForwardingManager(channelManager: 
BrokerToControllerChannelManager) extend
   request.context.clientAddress.getAddress
 )
 
-def onClientResponse(clientResponse: ClientResponse): Unit = {
-  val envelopeResponse = 
clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
-  val envelopeError = envelopeResponse.error()
-  val requestBody = request.body[AbstractRequest]
+class ForwardingResponseHandler extends 
BrokerToControllerRequestCompletionHandler {
+  override def onComplete(clientResponse: ClientResponse): Unit = {
+val envelopeResponse = 
clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
+val envelopeError = envelopeResponse.error()
+val requestBody = request.body[AbstractRequest]
 
-  val response = if (envelopeError != Errors.NONE) {
-// An envelope error indicates broker misconfiguration (e.g. the 
principal serde
-// might not be defined on the receiving broker). In this case, we do 
not return
-// the error directly to the client since it would not be expected. 
Instead we
-// return `UNKNOWN_SERVER_ERROR` so that the user knows that there is 
a problem
-// on the broker.
-debug(s"Forwarded request $request failed with an error in the 
envelope response $envelopeError")
-requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
-  } else {
-parseResponse(envelopeResponse.responseData, requestBody, 
request.header)
+val response = if (envelopeError != Errors.NONE) {
+  // An envelope error indicates broker misconfiguration (e.g. the 
principal serde
+  // might not be defined on the receiving broker). In this 

[GitHub] [kafka] mjsax commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

2020-11-30 Thread GitBox


mjsax commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r533020449



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger 
rocksDBMetricsRecordingTrigger() {
 }
 }
 
+public final Sensor clientLevelSensor(final String sensorName,
+  final RecordingLevel recordingLevel,
+  final Sensor... parents) {
+synchronized (clientLevelSensors) {
+final String fullSensorName = CLIENT_LEVEL_GROUP + 
SENSOR_NAME_DELIMITER + sensorName;
+final Sensor sensor = metrics.getSensor(fullSensorName);

Review comment:
   Should we rewrite this the same way `threadLevelSensor` is written (ie, 
using `orElseGet`) for consistency?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##
@@ -629,16 +657,18 @@ private void setupRemoveSensorsTest(final Metrics metrics,
 }
 
 @Test
-public void shouldRemoveClientLevelMetrics() {
+public void shouldRemoveClientLevelMetricsAndSensors() {
 final Metrics metrics = niceMock(Metrics.class);
 final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
-addSensorsOnAllLevels(metrics, streamsMetrics);
+final Capture sensorKeys = addSensorsOnAllLevels(metrics, 
streamsMetrics);
 resetToDefault(metrics);
-expect(metrics.removeMetric(metricName1)).andStubReturn(null);
-expect(metrics.removeMetric(metricName2)).andStubReturn(null);
-replay(metrics);
 
-streamsMetrics.removeAllClientLevelMetrics();
+metrics.removeSensor(sensorKeys.getValues().get(0));
+metrics.removeSensor(sensorKeys.getValues().get(1));
+
expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class));
+
expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class));

Review comment:
   Why did we change this from `andStubReturn(null)` to 
`andReturn(mock(KafkaMetric.class))`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
##
@@ -125,4 +131,16 @@ public static void addNumAliveStreamThreadMetric(final 
StreamsMetricsImpl stream
 stateProvider
 );
 }
+public static Sensor failedStreamThreadSensor(final StreamsMetricsImpl 
streamsMetrics) {

Review comment:
   nit: missing empty line





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-30 Thread GitBox


junrao commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r533002272



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -314,27 +315,23 @@ public KafkaProducer(Properties properties) {
  * be called in the producer when the serializer 
is passed in directly.
  */
 public KafkaProducer(Properties properties, Serializer keySerializer, 
Serializer valueSerializer) {
-this(Utils.propsToMap(properties), keySerializer, valueSerializer, 
null, null, null,
-Time.SYSTEM);
+this(Utils.propsToMap(properties), keySerializer, valueSerializer);
 }
 
 // visible for testing
 @SuppressWarnings("unchecked")
-KafkaProducer(Map configs,
+KafkaProducer(ProducerConfig config,
   Serializer keySerializer,
   Serializer valueSerializer,
   ProducerMetadata metadata,
   KafkaClient kafkaClient,
   ProducerInterceptors interceptors,
   Time time) {
-ProducerConfig config = new 
ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer,
-valueSerializer));
 try {
-Map userProvidedConfigs = config.originals();
 this.producerConfig = config;
 this.time = time;
 
-String transactionalId = (String) 
userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+String transactionalId = (String) 
config.originals().get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);

Review comment:
   Could we just do `config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)` 
here?

##
File path: 
clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
##
@@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() {
 
 // test configs with listener prefix
 Map configs = 
ChannelBuilders.channelBuilderConfigs(securityConfig, new 
ListenerName("listener1"));
+
 
assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
 assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), 
"testkafka");
+
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
+
 assertEquals(configs.get("sasl.kerberos.service.name"), 
"testkafkaglobal");
+
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));

Review comment:
   Should gssapi.sasl.kerberos.service.name be sasl.kerberos.service.name?

##
File path: 
clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
##
@@ -79,25 +80,52 @@ public void testChannelBuilderConfigs() {
 
 // test configs with listener prefix
 Map configs = 
ChannelBuilders.channelBuilderConfigs(securityConfig, new 
ListenerName("listener1"));
+
 
assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
+
 assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), 
"testkafka");
+
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
+
 assertEquals(configs.get("sasl.kerberos.service.name"), 
"testkafkaglobal");
+
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
+
 
assertNull(configs.get("listener.name.listener1.sasl.kerberos.service.name"));
+
assertFalse(securityConfig.unused().contains("listener.name.listener1.sasl.kerberos.service.name"));
 
 assertNull(configs.get("plain.sasl.server.callback.handler.class"));
+
assertFalse(securityConfig.unused().contains("plain.sasl.server.callback.handler.class"));
+
 
assertEquals(configs.get("listener.name.listener1.gssapi.config1.key"), 
"custom.config1");
+
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.config1.key"));
+
 assertEquals(configs.get("custom.config2.key"), "custom.config2");
+assertFalse(securityConfig.unused().contains("custom.config2.key"));
 
 // test configs without listener prefix
+securityConfig = new TestSecurityConfig(props);

Review comment:
   Do we need to instantiate again?

##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map 
originals,  Map
 throw new ConfigException(entry.getKey().toString(), 
entry.getValue(), "Key must be a string.");
 
 this.originals = resolveConfigVariables(configProviderProps, 
(Map) 

[jira] [Created] (KAFKA-10785) Rewrite ConfigEntityChangeNotificationSequenceZNode struct with auto-generated protocol

2020-11-30 Thread dengziming (Jira)
dengziming created KAFKA-10785:
--

 Summary: Rewrite ConfigEntityChangeNotificationSequenceZNode 
struct with auto-generated protocol
 Key: KAFKA-10785
 URL: https://issues.apache.org/jira/browse/KAFKA-10785
 Project: Kafka
  Issue Type: Sub-task
  Components: protocol
Reporter: dengziming
Assignee: dengziming






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10784) Rewrite ConfigEntityZNode struct with auto-generated protocol

2020-11-30 Thread dengziming (Jira)
dengziming created KAFKA-10784:
--

 Summary: Rewrite ConfigEntityZNode struct with auto-generated 
protocol
 Key: KAFKA-10784
 URL: https://issues.apache.org/jira/browse/KAFKA-10784
 Project: Kafka
  Issue Type: Sub-task
  Components: protocol
Reporter: dengziming
Assignee: dengziming






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10783) Rewrite TopicPartitionStateZNode struct with auto-generated protocol

2020-11-30 Thread dengziming (Jira)
dengziming created KAFKA-10783:
--

 Summary: Rewrite TopicPartitionStateZNode struct with 
auto-generated protocol
 Key: KAFKA-10783
 URL: https://issues.apache.org/jira/browse/KAFKA-10783
 Project: Kafka
  Issue Type: Sub-task
Reporter: dengziming
Assignee: dengziming






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10782) Rewrite TopicZNode struct with auto-generated protocol

2020-11-30 Thread dengziming (Jira)
dengziming created KAFKA-10782:
--

 Summary: Rewrite TopicZNode struct with auto-generated protocol
 Key: KAFKA-10782
 URL: https://issues.apache.org/jira/browse/KAFKA-10782
 Project: Kafka
  Issue Type: Sub-task
  Components: protocol
Reporter: dengziming
Assignee: dengziming






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10781) Rewrite BrokerIdZNode struct with auto-generated protocol

2020-11-30 Thread dengziming (Jira)
dengziming created KAFKA-10781:
--

 Summary:  Rewrite BrokerIdZNode struct with auto-generated protocol
 Key: KAFKA-10781
 URL: https://issues.apache.org/jira/browse/KAFKA-10781
 Project: Kafka
  Issue Type: Sub-task
  Components: protocol
Reporter: dengziming
Assignee: dengziming






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10780) Rewrite ControllerZNode struct with auto-generated protocol

2020-11-30 Thread dengziming (Jira)
dengziming created KAFKA-10780:
--

 Summary:  Rewrite ControllerZNode struct with auto-generated 
protocol
 Key: KAFKA-10780
 URL: https://issues.apache.org/jira/browse/KAFKA-10780
 Project: Kafka
  Issue Type: Sub-task
  Components: protocol
Reporter: dengziming
Assignee: dengziming


User auto-generated protocol to rewrite zk controller node



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10779) Reassignment tool sets throttles incorrectly when overriding a reassignment

2020-11-30 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10779:
---

 Summary: Reassignment tool sets throttles incorrectly when 
overriding a reassignment
 Key: KAFKA-10779
 URL: https://issues.apache.org/jira/browse/KAFKA-10779
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


The logic in `ReassignPartitionsCommand.calculateProposedMoveMap` assumes that 
adding replicas are not included in the replica set returned from `Metadata` or 
`ListPartitionReassignments`.  This is evident in the test case 
`ReassignPartitionsUnitTest.testMoveMap`. Because of this incorrect assumption, 
the move map is computed incorrectly which can result in the wrong throttles 
being applied. As far as I can tell, this is only an issue when overriding an 
existing reassignment. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate

2020-11-30 Thread GitBox


mjsax commented on a change in pull request #9606:
URL: https://github.com/apache/kafka/pull/9606#discussion_r532996901



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
##
@@ -438,7 +439,8 @@
  * query the value of the key on a parallel running instance of your Kafka 
Streams application.
  *
  * 
- * For failure and recovery the store will be backed by an internal 
changelog topic that will be created in Kafka.
+ * For failure and recovery the store (which always will be of type {@link 
TimestampedKeyValueStore} -- regardless of what
+ * is specified in the parameter {@materialized}) will be backed by an 
internal changelog topic that will be created in Kafka.

Review comment:
   No need to squash commits. We squash them automatically when merging.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9606: [KAFKA-10722] doc: Improve JavaDoc for KGroupedStream.aggregate

2020-11-30 Thread GitBox


mjsax commented on a change in pull request #9606:
URL: https://github.com/apache/kafka/pull/9606#discussion_r532996684



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
##
@@ -381,7 +381,8 @@
  * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
  *
  * 
- * For failure and recovery the store will be backed by an internal 
changelog topic that will be created in Kafka.
+ * For failure and recovery the store (which always will be of type {@link 
TimestampedKeyValueStore}) will be backed by
+ * an internal changelog topic that will be created in Kafka.

Review comment:
   `reduce()` and `count()` are similar to `aggregate()`: reduce only does 
not allow you to change the value type (ie, output type == input type) and 
count, well implements an aggregate() that counts :)
   
   And they all work the same for `KGroupedStream`, `CogroupedKStream`, 
`TimeWindowedKStream` and `TimeWindowedCoGroupedKStream`.
   
   `table()` and `globalTable()` just read a topic and upsert the data into a 
state store / table.
   
   So it would be great to do them all in one PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9607: [KAFKA-10722] doc: Described the types of the stores used

2020-11-30 Thread GitBox


mjsax commented on pull request #9607:
URL: https://github.com/apache/kafka/pull/9607#issuecomment-736136722


   Thanks for the PR @fml2! Sorry for the delay in reviewing. Sometimes it 
takes some time (good that you kept nagging :) )



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9607: [KAFKA-10722] doc: Described the types of the stores used

2020-11-30 Thread GitBox


mjsax merged pull request #9607:
URL: https://github.com/apache/kafka/pull/9607


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9660: Kafka 10629 - TopologyTestDriver should not require a Properties argument

2020-11-30 Thread GitBox


mjsax commented on a change in pull request #9660:
URL: https://github.com/apache/kafka/pull/9660#discussion_r532983342



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -254,6 +265,19 @@ public TopologyTestDriver(final Topology topology,
 this(topology, config, null);
 }
 
+/**
+ * Create a new test diver instance.
+ * Initialized the internally mocked wall-clock time with {@link 
System#currentTimeMillis() current system time}.

Review comment:
   This overload takes `initialWallClockTimeMs`, thus this sentence seems 
to be incorrect? Should go to `TopologyTestDriver(final Topology topology)` 
instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10629) TopologyTestDriver should not require a Properties arg

2020-11-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10629:

Labels: kip newbie  (was: needs-kip newbie)

> TopologyTestDriver should not require a Properties arg
> --
>
> Key: KAFKA-10629
> URL: https://issues.apache.org/jira/browse/KAFKA-10629
> Project: Kafka
>  Issue Type: Task
>  Components: streams, streams-test-utils
>Reporter: John Roesler
>Assignee: Rohit Deshpande
>Priority: Minor
>  Labels: kip, newbie
>
> As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver 
> usages will have no configurations at all to specify, so we should provide a 
> constructor that doesn't take a Properties argument. Right now, such 
> configuration-free usages have to provide an empty Properties object.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10629) TopologyTestDriver should not require a Properties arg

2020-11-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10629:

Issue Type: Improvement  (was: Task)

> TopologyTestDriver should not require a Properties arg
> --
>
> Key: KAFKA-10629
> URL: https://issues.apache.org/jira/browse/KAFKA-10629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, streams-test-utils
>Reporter: John Roesler
>Assignee: Rohit Deshpande
>Priority: Minor
>  Labels: kip, newbie
>
> As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver 
> usages will have no configurations at all to specify, so we should provide a 
> constructor that doesn't take a Properties argument. Right now, such 
> configuration-free usages have to provide an empty Properties object.
> KIP-680: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10629) TopologyTestDriver should not require a Properties arg

2020-11-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10629:

Description: 
As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver 
usages will have no configurations at all to specify, so we should provide a 
constructor that doesn't take a Properties argument. Right now, such 
configuration-free usages have to provide an empty Properties object.

KIP-680: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument]

  was:As of [https://github.com/apache/kafka/pull/9477,] many 
TopologyTestDriver usages will have no configurations at all to specify, so we 
should provide a constructor that doesn't take a Properties argument. Right 
now, such configuration-free usages have to provide an empty Properties object.


> TopologyTestDriver should not require a Properties arg
> --
>
> Key: KAFKA-10629
> URL: https://issues.apache.org/jira/browse/KAFKA-10629
> Project: Kafka
>  Issue Type: Task
>  Components: streams, streams-test-utils
>Reporter: John Roesler
>Assignee: Rohit Deshpande
>Priority: Minor
>  Labels: kip, newbie
>
> As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver 
> usages will have no configurations at all to specify, so we should provide a 
> constructor that doesn't take a Properties argument. Right now, such 
> configuration-free usages have to provide an empty Properties object.
> KIP-680: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-680%3A+TopologyTestDriver+should+not+require+a+Properties+argument]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10629) TopologyTestDriver should not require a Properties arg

2020-11-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241146#comment-17241146
 ] 

Matthias J. Sax commented on KAFKA-10629:
-

[~rohitdeshaws] – you got 3 binding votes already, from John, Guozhang, and 
myself.

Note, that all committer votes are binding (even if not stated explicitly). You 
can find a list of all committer on the web page: 
[https://kafka.apache.org/committers] 

> TopologyTestDriver should not require a Properties arg
> --
>
> Key: KAFKA-10629
> URL: https://issues.apache.org/jira/browse/KAFKA-10629
> Project: Kafka
>  Issue Type: Task
>  Components: streams, streams-test-utils
>Reporter: John Roesler
>Assignee: Rohit Deshpande
>Priority: Minor
>  Labels: needs-kip, newbie
>
> As of [https://github.com/apache/kafka/pull/9477,] many TopologyTestDriver 
> usages will have no configurations at all to specify, so we should provide a 
> constructor that doesn't take a Properties argument. Right now, such 
> configuration-free usages have to provide an empty Properties object.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2020-11-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241139#comment-17241139
 ] 

Matthias J. Sax edited comment on KAFKA-7918 at 11/30/20, 11:51 PM:


The existing caching layer also collapses writes into the changelog for the 
same key – but if you don't do the serialization, it's very hard to budget the 
used memory. It could make the system unstable and your JVM might crash with 
out-of-memory errors.

We want to improve the system obviously and if you have a good solution, feel 
free to propose a KIP 
([https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]).
 Getting rid of expensive (de)serialization cost could be huge win. My gut 
feeling is though, that without good memory management is would be hard to 
convince people; stability might be more important than performance.


was (Author: mjsax):
The existing caching layer also collapses writes into the changelog for the 
same key – but if you don't do the serialization, it's very hard to budget the 
used memory. It could make the system unstable and your JVM might crash with 
ouf-of-memory errors.

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2020-11-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241139#comment-17241139
 ] 

Matthias J. Sax commented on KAFKA-7918:


The existing caching layer also collapses writes into the changelog for the 
same key – but if you don't do the serialization, it's very hard to budget the 
used memory. It could make the system unstable and your JVM might crash with 
ouf-of-memory errors.

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

2020-11-30 Thread GitBox


junrao commented on a change in pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#discussion_r532963241



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -947,9 +947,10 @@ class Partition(val topicPartition: TopicPartition,
   leaderEndOffset: Long,
   currentTimeMs: Long,
   maxLagMs: Long): Boolean = {
-val followerReplica = getReplicaOrException(replicaId)
-followerReplica.logEndOffset != leaderEndOffset &&
-  (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
+getReplica(replicaId).fold(true) { followerReplica =>

Review comment:
   @jsancio : Yes, we can keep the logic in the PR. On the leader, the 
logic for shrinking ISR is checked every 10 secs by default. So, in the common 
case when completing a reassignment, the reduced ISR by the controller will be 
propagated to the leader before the leader's ISR shrinking logic kicks in.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9663: MINOR: Small cleanups in `AlterIsr` handling logic

2020-11-30 Thread GitBox


hachikuji opened a new pull request #9663:
URL: https://github.com/apache/kafka/pull/9663


   A few small cleanups in `Partition` handling of `AlterIsr`:
   
   - Factor state update and log message into `sendAlterIsrRequest`
   - Ensure illegal state error gets raised if a retry fails to be enqueued
   - Always check the proposed state against the current state in 
`handleAlterIsrResponse`
   - Add `toString` implementations to `IsrState` case classes
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-11-30 Thread Joel Hamill (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Hamill updated KAFKA-2967:
---
Labels: documentation  (was: )

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-11-30 Thread James Galasyn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113
 ] 

James Galasyn edited comment on KAFKA-2967 at 11/30/20, 10:49 PM:
--

Since I inherited the Streams docs, I've come to appreciate the points made by 
[~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate 
the HTML-based docs to markdown and host them on ReadTheDocs. We used this 
approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], 
and there's no reason it won't work for the AK docs.

Last week, I took a couple of hours and manually converted the entire AK doc 
set to markdown by using Pandoc: [ak-docs-proto 
repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to 
do, like fixing headings and updating links, but as a proof-of-concept, it 
works okay.

Once the markdown is cleaned up, we would move it to the docs directory in the 
public AK GitHub repo and set up a ReadTheDocs project with a Basic account 
($50/month). RTD will build from our GH branches and host the site for us.

The ksdqlDB docs have used this model successfully since May 2020, so the 
execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you 
think?

 


was (Author: jimgalasyn):
Since I inherited the Streams docs, I've come to appreciate the points made by 
[~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate 
the HTML-based docs to markdown and host them on ReadTheDocs. We used this 
approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], 
and there's no reason it won't work for the AK docs.

Last week, I took a couple of hours and manually converted the entire AK doc 
set to markdown by using Pandoc: [ak-docs-proto 
repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to 
do, like fixing headings and updating links, but as a proof-of-concept, it 
works okay.

Once the markdown is cleaned up, we would move it to the docs directory in the 
public AK GitHub repo and set up a ReadTheDocs project with a Basic account 
($50/month). RTD will build from our GH branches and host the site for us.

Because it's time-consuming and error-prone to maintain the Kafka content both 
in the AK site and in the CP docs site, we would remove the duplicative content 
from the CP docs site, leaving only the CP-related content there, with links 
(and redirects) back to the AK docs.

The ksdqlDB docs have used this model successfully since May 2020, so the 
execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you 
think?

 

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10702) Slow replication of empty transactions

2020-11-30 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10702.
-
Fix Version/s: 2.8.0
   Resolution: Fixed

> Slow replication of empty transactions
> --
>
> Key: KAFKA-10702
> URL: https://issues.apache.org/jira/browse/KAFKA-10702
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.8.0
>
>
> We hit a case in which we had to re-replicate a compacted topic from the 
> beginning of the log. Some portions of the log consisted mostly of 
> transaction markers, which were extremely slow to replicate. The problem is 
> that `ProducerStateManager` adds all of these empty transactions to its 
> internal collection of `ongoingTxns` before immediately removing them. There 
> could be tens of thousands of empty transactions in the worst case from a 
> single `Fetch` response, so this can create a huge amount of pressure on the 
> broker. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10702) Slow replication of empty transactions

2020-11-30 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10702:

Issue Type: Improvement  (was: Bug)

> Slow replication of empty transactions
> --
>
> Key: KAFKA-10702
> URL: https://issues.apache.org/jira/browse/KAFKA-10702
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> We hit a case in which we had to re-replicate a compacted topic from the 
> beginning of the log. Some portions of the log consisted mostly of 
> transaction markers, which were extremely slow to replicate. The problem is 
> that `ProducerStateManager` adds all of these empty transactions to its 
> internal collection of `ongoingTxns` before immediately removing them. There 
> could be tens of thousands of empty transactions in the worst case from a 
> single `Fetch` response, so this can create a huge amount of pressure on the 
> broker. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions

2020-11-30 Thread GitBox


hachikuji merged pull request #9632:
URL: https://github.com/apache/kafka/pull/9632


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-11-30 Thread James Galasyn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113
 ] 

James Galasyn edited comment on KAFKA-2967 at 11/30/20, 10:31 PM:
--

Since I inherited the Streams docs, I've come to appreciate the points made by 
[~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate 
the HTML-based docs to markdown and host them on ReadTheDocs. We used this 
approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], 
and there's no reason it won't work for the AK docs.

Last week, I took a couple of hours and manually converted the entire AK doc 
set to markdown by using Pandoc: [ak-docs-proto 
repo|[https://github.com/confluentinc/ak-docs-proto].] There's some more work 
to do, like fixing headings and updating links, but as a proof-of-concept, it 
works okay.

Once the markdown is cleaned up, we would move it to the docs directory in the 
public AK GitHub repo and set up a ReadTheDocs project with a Basic account 
($50/month). RTD will build from our GH branches and host the site for us.

Because it's time-consuming and error-prone to maintain the Kafka content both 
in the AK site and in the CP docs site, we would remove the duplicative content 
from the CP docs site, leaving only the CP-related content there, with links 
(and redirects) back to the AK docs.

The ksdqlDB docs have used this model successfully since May 2020, so the 
execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you 
think?

 


was (Author: jimgalasyn):
Since I inherited the Streams docs, I've come to appreciate the points made by 
[~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate 
the HTML-based docs to markdown and host them on ReadTheDocs. We used this 
approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], 
and there's no reason it won't work for the AK docs.

Last week, I took a couple of hours and manually converted the entire AK doc 
set to markdown by using Pandoc: [ak-docs-proto 
repo|[https://github.com/confluentinc/ak-docs-proto|https://github.com/confluentinc/ak-docs-proto].]].
 There's some more work to do, like fixing headings and updating links, but as 
a proof-of-concept, it works okay.

Once the markdown is cleaned up, we would move it to the docs directory in the 
public AK GitHub repo and set up a ReadTheDocs project with a Basic account 
($50/month). RTD will build from our GH branches and host the site for us.

Because it's time-consuming and error-prone to maintain the Kafka content both 
in the AK site and in the CP docs site, we would remove the duplicative content 
from the CP docs site, leaving only the CP-related content there, with links 
(and redirects) back to the AK docs.

The ksdqlDB docs have used this model successfully since May 2020, so the 
execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you 
think?

 

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-11-30 Thread James Galasyn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113
 ] 

James Galasyn edited comment on KAFKA-2967 at 11/30/20, 10:31 PM:
--

Since I inherited the Streams docs, I've come to appreciate the points made by 
[~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate 
the HTML-based docs to markdown and host them on ReadTheDocs. We used this 
approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], 
and there's no reason it won't work for the AK docs.

Last week, I took a couple of hours and manually converted the entire AK doc 
set to markdown by using Pandoc: [ak-docs-proto 
repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to 
do, like fixing headings and updating links, but as a proof-of-concept, it 
works okay.

Once the markdown is cleaned up, we would move it to the docs directory in the 
public AK GitHub repo and set up a ReadTheDocs project with a Basic account 
($50/month). RTD will build from our GH branches and host the site for us.

Because it's time-consuming and error-prone to maintain the Kafka content both 
in the AK site and in the CP docs site, we would remove the duplicative content 
from the CP docs site, leaving only the CP-related content there, with links 
(and redirects) back to the AK docs.

The ksdqlDB docs have used this model successfully since May 2020, so the 
execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you 
think?

 


was (Author: jimgalasyn):
Since I inherited the Streams docs, I've come to appreciate the points made by 
[~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate 
the HTML-based docs to markdown and host them on ReadTheDocs. We used this 
approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], 
and there's no reason it won't work for the AK docs.

Last week, I took a couple of hours and manually converted the entire AK doc 
set to markdown by using Pandoc: [ak-docs-proto 
repo|[https://github.com/confluentinc/ak-docs-proto].] There's some more work 
to do, like fixing headings and updating links, but as a proof-of-concept, it 
works okay.

Once the markdown is cleaned up, we would move it to the docs directory in the 
public AK GitHub repo and set up a ReadTheDocs project with a Basic account 
($50/month). RTD will build from our GH branches and host the site for us.

Because it's time-consuming and error-prone to maintain the Kafka content both 
in the AK site and in the CP docs site, we would remove the duplicative content 
from the CP docs site, leaving only the CP-related content there, with links 
(and redirects) back to the AK docs.

The ksdqlDB docs have used this model successfully since May 2020, so the 
execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you 
think?

 

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-11-30 Thread James Galasyn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113
 ] 

James Galasyn commented on KAFKA-2967:
--

Since I inherited the Streams docs, I've come to appreciate the points made by 
[~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate 
the HTML-based docs to markdown and host them on ReadTheDocs. We used this 
approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], 
and there's no reason it won't work for the AK docs.

Last week, I took a couple of hours and manually converted the entire AK doc 
set to markdown by using Pandoc: [ak-docs-proto 
repo|[https://github.com/confluentinc/ak-docs-proto|https://github.com/confluentinc/ak-docs-proto].]].
 There's some more work to do, like fixing headings and updating links, but as 
a proof-of-concept, it works okay.

Once the markdown is cleaned up, we would move it to the docs directory in the 
public AK GitHub repo and set up a ReadTheDocs project with a Basic account 
($50/month). RTD will build from our GH branches and host the site for us.

Because it's time-consuming and error-prone to maintain the Kafka content both 
in the AK site and in the CP docs site, we would remove the duplicative content 
from the CP docs site, leaving only the CP-related content there, with links 
(and redirects) back to the AK docs.

The ksdqlDB docs have used this model successfully since May 2020, so the 
execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you 
think?

 

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] twobeeb commented on pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2020-11-30 Thread GitBox


twobeeb commented on pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#issuecomment-736079387


   @ryannedolan 
   I have a preference for your first suggestion (the one for which there is 
currently a code proposition) because it doesn't alter the behavior of MM2 in 
any way for existing users.  
   
   > IMO, we'd ideally skip creating the A->B herder whenever 
A->B.emit.heartbeats.enabled=false (defaults to true) and A->B.enabled=false 
(defaults to false). A top-level emit.heartbeats.enabled=false would then 
disable heartbeats altogether, which would trivially eliminate the extra 
herders. N.B. this would just be an optimization and wouldn't required a KIP, 
IMO.
   
   That being said, I'm also fine with your latest proposition. It fits 
perfectly to my personnal use case because topics will be replicated local to 
central as well as central to local in the near future ; all instanciated 
herders will then transmit data (and not just beats).
   
   Could you confirm and/or elaborate on the behavior you want to see when 
playing with ``emit.heartbeats.enabled``  ?
   
   I guess that'd be good test cases : 
   
   **Simple A to B**
   
   A->B.enabled=true
   
   Expected : 2 herders
 
   **A to B and A to C**
    
   clusters=A, B, C
   A->B.enabled=true
   A->C.enabled=true
   
Expected : 4 herders (AB BA AC CA)
   
   **Disabled heartbeats**
    
   clusters=A, B, C
   A->B.enabled=true
   A->B.emit.heartbeats.enabled=false
   B->A.emit.heartbeats.enabled=false
   A->C.enabled=true
   A->C.emit.heartbeats.enabled=false
   C->A.emit.heartbeats.enabled=false
   
   Expected : 4 herders (2 of which are unused, but still much better than 
N*N-1)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdbyrne commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas

2020-11-30 Thread GitBox


bdbyrne commented on a change in pull request #9628:
URL: https://github.com/apache/kafka/pull/9628#discussion_r532909153



##
File path: 
clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java
##
@@ -32,6 +32,7 @@
  */
 public static final String USER = "user";
 public static final String CLIENT_ID = "client-id";
+public static final String IP = "IP";

Review comment:
   Consider making lower-case for consistency, but otherwise not a big 
issue, as I see that's how it was proposed in the KIP.

##
File path: core/src/main/scala/kafka/server/AdminManager.scala
##
@@ -807,23 +807,25 @@ class AdminManager(val config: KafkaConfig,
   case name => Sanitizer.desanitize(name)
 }
 
-  private def entityToSanitizedUserClientId(entity: ClientQuotaEntity): 
(Option[String], Option[String]) = {
+  private def entityToSanitizedUserClientId(entity: ClientQuotaEntity): 
(Option[String], Option[String], Option[String]) = {

Review comment:
   May want to rename (generalize) since this now includes IP

##
File path: core/src/main/scala/kafka/server/AdminManager.scala
##
@@ -833,45 +835,76 @@ class AdminManager(val config: KafkaConfig,
   def describeClientQuotas(filter: ClientQuotaFilter): Map[ClientQuotaEntity, 
Map[String, Double]] = {
 var userComponent: Option[ClientQuotaFilterComponent] = None
 var clientIdComponent: Option[ClientQuotaFilterComponent] = None
+var ipComponent: Option[ClientQuotaFilterComponent] = None
 filter.components.forEach { component =>
   component.entityType match {
 case ClientQuotaEntity.USER =>
   if (userComponent.isDefined)
-throw new InvalidRequestException(s"Duplicate user filter 
component entity type");
+throw new InvalidRequestException(s"Duplicate user filter 
component entity type")
   userComponent = Some(component)
 case ClientQuotaEntity.CLIENT_ID =>
   if (clientIdComponent.isDefined)
-throw new InvalidRequestException(s"Duplicate client filter 
component entity type");
+throw new InvalidRequestException(s"Duplicate client filter 
component entity type")
   clientIdComponent = Some(component)
+case ClientQuotaEntity.IP =>
+  if (ipComponent.isDefined)
+throw new InvalidRequestException(s"Duplicate ip filter component 
entity type")
+  ipComponent = Some(component)
 case "" =>
   throw new InvalidRequestException(s"Unexpected empty filter 
component entity type")
 case et =>
   // Supplying other entity types is not yet supported.
   throw new UnsupportedVersionException(s"Custom entity type '${et}' 
not supported")
   }
 }
-handleDescribeClientQuotas(userComponent, clientIdComponent, filter.strict)
+if ((userComponent.isDefined || clientIdComponent.isDefined) && 
ipComponent.isDefined)
+  throw new InvalidRequestException(s"Invalid entity filter component 
combination")

Review comment:
   May want to provide a little more detail in the response

##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1529,6 +1530,16 @@ object TestUtils extends Logging {
 adminClient.incrementalAlterConfigs(configs)
   }
 
+  def alterClientQuotas(adminClient: Admin, request: Map[ClientQuotaEntity, 
Map[String, Option[Double]]]): AlterClientQuotasResult = {
+val entries = request.map { case (entity, alter) =>
+  val ops = alter.map { case (key, value) =>
+new ClientQuotaAlteration.Op(key, 
value.map(Double.box).getOrElse(null))
+  }.asJavaCollection
+new ClientQuotaAlteration(entity, ops)

Review comment:
   nit: indenting





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap

2020-11-30 Thread GitBox


viktorsomogyi edited a comment on pull request #9519:
URL: https://github.com/apache/kafka/pull/9519#issuecomment-736027069


   @lbradstreet it is really hard to give an exact answer to this as collision 
rate is hard to calculate mathematically as it is very dependant on the size 
and values of the testset. For non-cryptographic hashes it is possible to 
generate DDoS attacks where everything gets placed into the same bucket and 
thus slows down lookups. On the theoretical side though Murmur3 passes the most 
often cited Chi Square test, it has a very good avalanche effect and thus 
generates a hashes that are very close to the uniform distribution.
   Because of the lack of available mathematical articles on this topic (murmur 
vs MD5) I started brute-force tests where I generated a few billion unique keys 
and inserted them into a Bloom Filter (which had a 1% false positive 
probability). That showed that Murmur3 is actually on the same level as MD5, it 
generates roughly the same amount of collisions. I have to types of datasets: 
the first can use any UTF8 characters and the second works only from the 
printable ASCII characters. In fact both MD5, murmur3 128bit, murmur3 64bit and 
xxhash64 bit generated around the same amount of collisions which was 0.016% 
out of 200 million unique keys. I added Murmur3 32bit for a baseline but it was 
significantly worse, around 2% of collisions. Maybe to show the difference we 
need a much larger keyset, I'll try to do what I can.
   I'll publish my code in the following days I just have to work on something 
else too so it's a bit slow, sorry :).
   
   On the other hand if we want to make sure that there will be no collisions, 
I don't think it's possible with either of these solutions, there is always a 
chance. To completely cut this off we either have to store the user key-hash 
maps similarly to the offset indexes and reject new, colliding keys or use 
perfect hashes (but that couldn't work well as it requires the knowledge of the 
full keyset or have to rebuild the cache in each insert or at least often).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9657: MINOR: Remove erroneous extra in design doc

2020-11-30 Thread GitBox


bbejeck commented on pull request #9657:
URL: https://github.com/apache/kafka/pull/9657#issuecomment-736032726


   cherry-picked to 2.7



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d1egoaz closed pull request #9088: sync docs for list of emitted metrics by MirrorMetrics

2020-11-30 Thread GitBox


d1egoaz closed pull request #9088:
URL: https://github.com/apache/kafka/pull/9088


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9655: MINOR: fix listeners doc to close properly

2020-11-30 Thread GitBox


bbejeck commented on pull request #9655:
URL: https://github.com/apache/kafka/pull/9655#issuecomment-736030783


   cherry-picked to 2.7



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9657: MINOR: Remove erroneous extra in design doc

2020-11-30 Thread GitBox


bbejeck merged pull request #9657:
URL: https://github.com/apache/kafka/pull/9657


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap

2020-11-30 Thread GitBox


viktorsomogyi edited a comment on pull request #9519:
URL: https://github.com/apache/kafka/pull/9519#issuecomment-736027069


   @lbradstreet it is really hard to give an exact answer to this as collision 
rate is hard to calculate mathematically as it is very dependant on the size 
and values of the testset. For non-cryptographic hashes it is possible to 
generate DDoS attacks where everything gets placed into the same bucket and 
thus slows down lookups. On the theoretical side though Murmur3 passes the most 
often cited Chi Square test, it has a very good avalanche effect and thus 
generates a hashes that are very close to the uniform distribution.
   Because of the lack of available mathematical articles on this topic (murmur 
vs MD5) I started brute-force tests where I generated a few billion unique keys 
and inserted them into a Bloom Filter (which had a 1% false positive 
probability). That showed that Murmur3 is actually on the same level as MD5, it 
generates roughly the same amount of collisions. I have to types of datasets: 
the first can use any UTF8 characters and the second works only from the 
printable ASCII characters. In fact both MD5, murmur3 128bit, murmur3 64bit and 
xxhash64 bit generated around the same amount of collisions which was 0.016% 
out of 200 million unique keys. I added Murmur3 32bit for a baseline but it was 
significantly worse, around 2% of collisions. Maybe to show the difference we 
need a much larger keyset, I'll try to do what I can.
   I'll publish my code in the following days I just have to work on something 
else too so it's a bit slow, sorry :).
   
   On the other hand if we want to make sure that there will be no collisions, 
I don't think it's possible with either of these solutions, there is always a 
chance. To completely cut this off we either have to store the user key-hash 
maps similarly to the offset indexes and reject new, colliding keys or resolve 
the collision or use perfect hashes (but that couldn't work well as it requires 
the knowledge of the full keyset or have to rebuild the cache in each insert or 
at least often).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #9519: KAFKA-10650: Use Murmur3 instead of MD5 in SkimpyOffsetMap

2020-11-30 Thread GitBox


viktorsomogyi commented on pull request #9519:
URL: https://github.com/apache/kafka/pull/9519#issuecomment-736027069


   @lbradstreet it is really hard to give an exact answer to this as collision 
rate is hard to calculate mathematically as it is very dependant on the size 
and values of the testset. For non-cryptographic hashes it is possible to 
generate DDoS attacks where everything gets placed into the same bucket and 
thus slows down lookups. On the theoretical side though Murmur3 passes the most 
often cited Chi Square test, it has a very good avalanche effect and thus 
generates a hashes that are very close to the uniform distribution.
   Because of the lack of available mathematical articles on this topic (murmur 
vs MD5) I started brute-force tests where I generated a few billion unique keys 
and inserted them into a Bloom Filter (which had a 1% false positive 
probability). That showed that Murmur3 is actually on the same level as MD5, it 
generates roughly the same amount of collisions. I have to types of datasets: 
the first can use any UTF8 characters and the second works only from the 
printable ASCII characters. In fact both MD5, murmur3 128bit, murmur3 64bit and 
xxhash64 bit generated around the same amount of collisions which was 0.016% 
out of 200 million unique keys. I added Murmur3 32bit for a baseline but it was 
significantly worse, around 2% of collisions. Maybe to show the difference we 
need a much larger keyset, I'll try to do what I can.
   I'll publish my code in the following days I just have to work on something 
else too so it's a bit slow, sorry :).
   
   On the other hand if we want to make sure that there will be no collisions, 
I don't think it's possible with either of these solutions, there is always a 
chance. To completely cut this off we either have to store the user key-hash 
maps similarly to the offset indexes or use perfect hashes (but that couldn't 
work well as it requires the knowledge of the full keyset or have to rebuild 
the cache in each insert or at least often).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10778) Stronger log fencing after write failure

2020-11-30 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10778:

Description: 
If a log append operation fails with an IO error, the broker attempts to fail 
the log dir that it resides in. Currently this is done asynchronously, which 
means there is no guarantee that additional appends won't be attempted before 
the log is fenced. This can be a problem for EOS because of the need to 
maintain consistent producer state.

1. Iterate through batches to build producer state and collect completed 
transactions
2. Append the batches to the log 
3. Update the offset/timestamp indexes
4. Update log end offset
5. Apply individual producer state to `ProducerStateManager`
6. Update the transaction index
7. Update completed transactions and advance LSO

One example of how this process can go wrong is if the index updates in step 3 
fail. In this case, the log will contain updated producer state which has not 
been reflected in `ProducerStateManager`. If the append is retried before the 
log is fenced, then we can have duplicates. There are probably other potential 
failures that are possible as well.

I'm sure we can come up with some way to fix this specific case, but the 
general fencing approach is slippery enough that we'll have a hard time 
convincing ourselves that it handles all potential cases. It would be simpler 
to add synchronous fencing logic for the case when an append fails due to an IO 
error. For example, we can mark a flag to indicate that the log is closed for 
additional read/write operations.

  was:
If a log operation fails with an IO error, the broker attempts to fail the log 
dir that it resides in. Currently this is done asynchronously, which means 
there is no guarantee that additional appends won't be attempted before the log 
is fenced. This can be a problem for EOS because of the need to maintain 
consistent producer state.

1. Iterate through batches to build producer state and collect completed 
transactions
2. Append the batches to the log 
3. Update the offset/timestamp indexes
4. Update log end offset
5. Apply individual producer state to `ProducerStateManager`
6. Update the transaction index
7. Update completed transactions and advance LSO

One example of how this process can go wrong is if the index updates in step 3 
fail. In this case, the log will contain updated producer state which has not 
been reflected in `ProducerStateManager`. If the append is retried before the 
log is fenced, then we can have duplicates. There are probably other potential 
failures that are possible as well.

I'm sure we can come up with some way to fix this specific case, but the 
general fencing approach is slippery enough that we'll have a hard time 
convincing ourselves that it handles all potential cases. It would be simpler 
to add synchronous fencing logic for the case when an append fails due to an IO 
error. For example, we can mark a flag to indicate that the log is closed for 
additional read/write operations.


> Stronger log fencing after write failure
> 
>
> Key: KAFKA-10778
> URL: https://issues.apache.org/jira/browse/KAFKA-10778
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> If a log append operation fails with an IO error, the broker attempts to fail 
> the log dir that it resides in. Currently this is done asynchronously, which 
> means there is no guarantee that additional appends won't be attempted before 
> the log is fenced. This can be a problem for EOS because of the need to 
> maintain consistent producer state.
> 1. Iterate through batches to build producer state and collect completed 
> transactions
> 2. Append the batches to the log 
> 3. Update the offset/timestamp indexes
> 4. Update log end offset
> 5. Apply individual producer state to `ProducerStateManager`
> 6. Update the transaction index
> 7. Update completed transactions and advance LSO
> One example of how this process can go wrong is if the index updates in step 
> 3 fail. In this case, the log will contain updated producer state which has 
> not been reflected in `ProducerStateManager`. If the append is retried before 
> the log is fenced, then we can have duplicates. There are probably other 
> potential failures that are possible as well.
> I'm sure we can come up with some way to fix this specific case, but the 
> general fencing approach is slippery enough that we'll have a hard time 
> convincing ourselves that it handles all potential cases. It would be simpler 
> to add synchronous fencing logic for the case when an append fails due to an 
> IO error. For example, we can mark a flag to indicate that the log is closed 
> for additional read/write operations.



--
This message was sent by Atlassian Jira

[GitHub] [kafka] hachikuji commented on pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions

2020-11-30 Thread GitBox


hachikuji commented on pull request #9632:
URL: https://github.com/apache/kafka/pull/9632#issuecomment-736004238


   Note I filed https://issues.apache.org/jira/browse/KAFKA-10778 to introduce 
synchronous log fencing after IO errors.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10778) Stronger log fencing after write failure

2020-11-30 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10778:
---

 Summary: Stronger log fencing after write failure
 Key: KAFKA-10778
 URL: https://issues.apache.org/jira/browse/KAFKA-10778
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


If a log operation fails with an IO error, the broker attempts to fail the log 
dir that it resides in. Currently this is done asynchronously, which means 
there is no guarantee that additional appends won't be attempted before the log 
is fenced. This can be a problem for EOS because of the need to maintain 
consistent producer state.

1. Iterate through batches to build producer state and collect completed 
transactions
2. Append the batches to the log 
3. Update the offset/timestamp indexes
4. Update log end offset
5. Apply individual producer state to `ProducerStateManager`
6. Update the transaction index
7. Update completed transactions and advance LSO

One example of how this process can go wrong is if the index updates in step 3 
fail. In this case, the log will contain updated producer state which has not 
been reflected in `ProducerStateManager`. If the append is retried before the 
log is fenced, then we can have duplicates. There are probably other potential 
failures that are possible as well.

I'm sure we can come up with some way to fix this specific case, but the 
general fencing approach is slippery enough that we'll have a hard time 
convincing ourselves that it handles all potential cases. It would be simpler 
to add synchronous fencing logic for the case when an append fails due to an IO 
error. For example, we can mark a flag to indicate that the log is closed for 
additional read/write operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #9632: KAFKA-10702; Skip bookkeeping of empty transactions

2020-11-30 Thread GitBox


hachikuji commented on pull request #9632:
URL: https://github.com/apache/kafka/pull/9632#issuecomment-735990319


   For a little more background about the 
`LogTest.testAppendToTransactionIndexFailure` failure, it is due to an 
inconsistency in how we update state in `ProducerStateManager`. The current 
append flow is the following:
   
   1. Build producer state in `ProducerAppendInfo` instances and collect 
completed transactions
   2. Append the entry to the log
   3. Update log end offset
   4. Apply individual producer state to `ProducerStateManager`
   5. Update the transaction index
   6. Update completed transactions and advance LSO
   
   The idea is that the LSO is stuck if an append to the transaction index 
fails. However, because we have already updated producer state before the index 
write, we are left with an inconsistency. The LSO will reflect an ongoing 
transaction which is not reflected in any of the producer states. 
   
   The test case that is failing is validating the behavior when the index 
write fails. It works like this:
   
   1. First append some transactional data to the log
   2. Append an ABORT marker, but let the write to the transaction index fail
   3. Retry the append of the ABORT and verify that append still fails and the 
LSO is stuck
   
   The test fails because the second append no longer attempts to write to the 
transaction index. I can change the test of course, but I was disturbed about 
the underlying assumption that the write of the transaction marker can be 
retried on the `Log` after a failure. In fact, the path to fencing the `Log` 
after a write failure is asynchronous today. We use `LogDirFailureChannel` to 
propagate log failures to a separate thread which is responsible for marking 
the log dir offline or shutting down the broker. So there is indeed a (small) 
window during which a `WriteTxnMarkers` request could be retried. My feeling is 
that EOS demands a stronger guarantee and we need to fence off the `Log` 
instance synchronously while still holding the lock.
   
   So I think we need a separate jira to fix this issue. The question then is 
whether it should block this patch or not. I am thinking not at the moment. The 
test fails because there is no second append to the transaction index, but this 
is not required for correctness, and the LSO remains stuck as expected in any 
case. Basically I'd say we're no worse than before. I will add a commit which 
alters the test case so that it can pass and we can discuss tightening up the 
failure logic in a separate jira.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-10758) Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a new topic

2020-11-30 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler closed KAFKA-10758.


> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---
>
> Key: KAFKA-10758
> URL: https://issues.apache.org/jira/browse/KAFKA-10758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Davide Icardi
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.7.0, 2.6.1
>
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> sourceTopicsByName = [KSTREAM-SOURCE-00, KSTREAM-SOURCE-14, 
> KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  Caused by: java.lang.IllegalStateException: Tried to update source topics 
> but source nodes did not match
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
>   ... 10 common frames omitted
>  KafkaStream state is ERROR
>  17:28:53.200 [datalake-StreamThread-1] ERROR 
> o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream 
> threads have died. The instance will be in error state and should be closed.
>  > User rebalance callback throws an error
>  KafkaStream state is PENDING_SHUTDOWN
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-10754) Fix flaky shouldShutdownSingleThreadApplication test

2020-11-30 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler closed KAFKA-10754.


> Fix flaky shouldShutdownSingleThreadApplication test
> 
>
> Key: KAFKA-10754
> URL: https://issues.apache.org/jira/browse/KAFKA-10754
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.8.0
>
>
> org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownSingleThreadApplication
>  failed, log available in 
> /home/jenkins/jenkins-agent/workspace/Kafka/kafka-trunk-jdk11/streams/build/reports/testOutput/org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownSingleThreadApplication.test.stdout
> org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest
>  > shouldShutdownSingleThreadApplication FAILED
>  java.lang.AssertionError: Expected all streams instances in 
> [org.apache.kafka.streams.KafkaStreams@36c1250, 
> org.apache.kafka.streams.KafkaStreams@124268b5] to be ERROR within 3 ms, 
> but the following were not: 
> \{org.apache.kafka.streams.KafkaStreams@124268b5=RUNNING, 
> org.apache.kafka.streams.KafkaStreams@36c1250=RUNNING}
>  at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitForApplicationState$12(IntegrationTestUtils.java:933)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:450)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:418)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState(IntegrationTestUtils.java:916)
>  at 
> org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownSingleThreadApplication(StreamsUncaughtExceptionHandlerIntegrationTest.java:186)
>  
>  
> [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk15/runs/267/log/?start=0]
> [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk11/runs/241/log/?start=0]
> [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk15/runs/270/log/?start=0]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-11-30 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10292:
-
Priority: Blocker  (was: Major)

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.8.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> {
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> }
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> {
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10292) fix flaky streams/streams_broker_bounce_test.py

2020-11-30 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10292:
-
Fix Version/s: 2.8.0

> fix flaky streams/streams_broker_bounce_test.py
> ---
>
> Key: KAFKA-10292
> URL: https://issues.apache.org/jira/browse/KAFKA-10292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.8.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> {
>   "broker_type": "leader",
>   "failure_mode": "clean_bounce",
>   "num_threads": 1,
>   "sleep_time_secs": 120
> }
> {quote}
> {quote}
> Module: kafkatest.tests.streams.streams_broker_bounce_test
> Class:  StreamsBrokerBounceTest
> Method: test_broker_type_bounce
> Arguments:
> {
>   "broker_type": "controller",
>   "failure_mode": "hard_shutdown",
>   "num_threads": 3,
>   "sleep_time_secs": 120
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-11-30 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10017:
-
Fix Version/s: (was: 2.6.0)
   2.8.0

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test, unit-test
> Fix For: 2.8.0
>
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10777) Add additional configuration to control MM2 internal topics naming convention

2020-11-30 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-10777:
-

 Summary: Add additional configuration to control MM2 internal 
topics naming convention
 Key: KAFKA-10777
 URL: https://issues.apache.org/jira/browse/KAFKA-10777
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.6.0
Reporter: Omnia Ibrahim


MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are 
hardcoded in the source code which makes it hard to run MM2 with any Kafka 
Cluster that has rules around topic’s naming convention and doesn’t allow 
auto-creation for topics.

In this case developers will need to create these internal topics up-front 
manually and make sure they do follow the cluster rules and guidance for topic 
creation, so MM2 should have flexibility to let you override the name of 
internal topics to follow their cluster topic naming convention. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2020-11-30 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17240908#comment-17240908
 ] 

Ming Liu commented on KAFKA-7918:
-

We also had the code to optimize the changelog supporting it.  We only push to 
changelog at the certain interval (and collapse the data with same key to save 
the serialization cost). 

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`

2020-11-30 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-8266.

Resolution: Fixed

> Improve 
> `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
> 
>
> Key: KAFKA-8266
> URL: https://issues.apache.org/jira/browse/KAFKA-8266
> Project: Kafka
>  Issue Type: Test
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> Some additional validation could be done after the member gets kicked out. 
> The main thing is showing that the group can continue to consume data and 
> commit offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`

2020-11-30 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17240842#comment-17240842
 ] 

David Jacot commented on KAFKA-8266:


It seems that we haven't seen this one for a while now. I will close it. 
Please, re-open if necessary.

> Improve 
> `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
> 
>
> Key: KAFKA-8266
> URL: https://issues.apache.org/jira/browse/KAFKA-8266
> Project: Kafka
>  Issue Type: Test
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> Some additional validation could be done after the member gets kicked out. 
> The main thing is showing that the group can continue to consume data and 
> commit offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #9662: KAFKA-10130; Rewrite FeatureZNode struct with auto-generated protocol

2020-11-30 Thread GitBox


dengziming commented on pull request #9662:
URL: https://github.com/apache/kafka/pull/9662#issuecomment-735765304


   @abbccdda @kowshik hi, PTAL.
   also ping @cmccabe @hachikuji to have a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ankit-kumar-25 commented on pull request #9326: KAFKA-10460: ReplicaListValidator format checking is incomplete

2020-11-30 Thread GitBox


ankit-kumar-25 commented on pull request #9326:
URL: https://github.com/apache/kafka/pull/9326#issuecomment-735753885


   Hey @mimaison, Thank you for your input. I have added a couple of test cases 
to validate this condition. 
   
   For some reason the `Build Failed`. Can you please review the change and 
re-run the test/build if feasible?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9376) Plugin class loader not found using MM2

2020-11-30 Thread shezm (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17240706#comment-17240706
 ] 

shezm commented on KAFKA-9376:
--

Hi,I had the same problem too(release 2.4.0 with scala 2.12). 

I think the problem is here:

 
{code:java}
public class DelegatingClassLoader extends URLClassLoader {
.. other code
/**
 * Retrieve the PluginClassLoader associated with a plugin class
 * @param name The fully qualified class name of the plugin
 * @return the PluginClassLoader that should be used to load this, or null if 
the plugin is not isolated.
 */
public PluginClassLoader pluginClassLoader(String name) {
if (!PluginUtils.shouldLoadInIsolation(name)) {
return null;
}
SortedMap, ClassLoader> inner = pluginLoaders.get(name);
if (inner == null) {
return null;
}
//--- is here ---
// I found `pluginLoader` type was `AppClassLoader` 
ClassLoader pluginLoader = inner.get(inner.lastKey());
return pluginLoader instanceof PluginClassLoader
   ? (PluginClassLoader) pluginLoader
   : null;
}

public ClassLoader connectorLoader(Connector connector) {
return connectorLoader(connector.getClass().getName());
}

public ClassLoader connectorLoader(String connectorClassOrAlias) {
log.debug("Getting plugin class loader for connector: '{}'", 
connectorClassOrAlias);
String fullName = aliases.containsKey(connectorClassOrAlias)
  ? aliases.get(connectorClassOrAlias)
  : connectorClassOrAlias;
PluginClassLoader classLoader = pluginClassLoader(fullName);
if (classLoader == null) {
log.error(
"Plugin class loader for connector: '{}' was not found. 
Returning: {}",
connectorClassOrAlias,
this
);
return this;
}
return classLoader;
}

}

{code}
Looking at the code, I found that in the code of 
`DelegatingClassLoader#PluginClassLoader()`, the type of variable pluginLoader 
is AppClassLoader, which will result in returning null and output 
log.error(...), but no exception is thrown.


But it doesn't seem to cause mm2 to be unavailable, so I think it might be a 
wrong log output.

 

> Plugin class loader not found using MM2
> ---
>
> Key: KAFKA-9376
> URL: https://issues.apache.org/jira/browse/KAFKA-9376
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Sinóros-Szabó Péter
>Priority: Minor
>
> I am using MM2 (release 2.4.0 with scala 2.12) I geta bunch of classloader 
> errors. MM2 seems to be working, but I do not know if all of it components 
> are working as expected as this is the first time I use MM2.
> I run MM2 with the following command:
> {code:java}
> ./bin/connect-mirror-maker.sh config/connect-mirror-maker.properties
> {code}
> Errors are:
> {code:java}
> [2020-01-07 15:06:17,892] ERROR Plugin class loader for connector: 
> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. 
> Returning: 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> [2020-01-07 15:06:17,889] ERROR Plugin class loader for connector: 
> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. 
> Returning: 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@6ebf0f36 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> [2020-01-07 15:06:17,904] INFO ConnectorConfig values:
>  config.action.reload = restart
>  connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
>  errors.log.enable = false
>  errors.log.include.messages = false
>  errors.retry.delay.max.ms = 6
>  errors.retry.timeout = 0
>  errors.tolerance = none
>  header.converter = null
>  key.converter = null
>  name = MirrorHeartbeatConnector
>  tasks.max = 1
>  transforms = []
>  value.converter = null
>  (org.apache.kafka.connect.runtime.ConnectorConfig:347)
> [2020-01-07 15:06:17,904] INFO EnrichedConnectorConfig values:
>  config.action.reload = restart
>  connector.class = org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
>  errors.log.enable = false
>  errors.log.include.messages = false
>  errors.retry.delay.max.ms = 6
>  errors.retry.timeout = 0
>  errors.tolerance = none
>  header.converter = null
>  key.converter = null
>  name = MirrorHeartbeatConnector
>  tasks.max = 1
>  transforms = []
>  value.converter = null
>  
> (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347)
> [2020-01-07 15:06:17,905] INFO TaskConfig values:
>  task.class = class org.apache.kafka.connect.mirror.MirrorHeartbeatTask
>  (org.apache.kafka.connect.runtime.TaskConfig:347)
> 

[GitHub] [kafka] dengziming opened a new pull request #9662: Kafka-10130; Rewrite FeatureZNode struct with auto-generated protocol

2020-11-30 Thread GitBox


dengziming opened a new pull request #9662:
URL: https://github.com/apache/kafka/pull/9662


   *More detailed description of your change*
   
   1. remove FeatureZNode and replace it with FeatureZNode.json
   2. Change code where FeatureZNode is used
   3. copy some code of `org.apache.kafka.raft.FileBasedStateStore` to generate 
json data from and to FeatureZNodeData
   
   
   *Summary of testing strategy (including rationale)*
   Replace FeatureZNode  in unit test and integration test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10736) Convert transaction coordinator metadata schemas to use generated protocol

2020-11-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10736.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Convert transaction coordinator metadata schemas to use generated protocol
> --
>
> Key: KAFKA-10736
> URL: https://issues.apache.org/jira/browse/KAFKA-10736
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.8.0
>
>
> We need to convert the internal schemas used for representing transaction 
> metadata to the generated protocol. This opens the door for flexible version 
> support on the next bump. 
> similar to https://issues.apache.org/jira/browse/KAFKA-10497



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #9611: KAFKA-10736 Convert transaction coordinator metadata schemas to use g…

2020-11-30 Thread GitBox


chia7712 merged pull request #9611:
URL: https://github.com/apache/kafka/pull/9611


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-11-30 Thread GitBox


cadonna commented on pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#issuecomment-735665881


   @vamossagar12 I replied to your comments. Let me know when the PR is ready 
for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10767) Add Unit Test cases for missing methods in ThreadCacheTest

2020-11-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10767:
--
Labels: newbie  (was: )

> Add Unit Test cases for missing methods in ThreadCacheTest
> --
>
> Key: KAFKA-10767
> URL: https://issues.apache.org/jira/browse/KAFKA-10767
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie
>
> During the code review for KIP-614, it was noticed that some methods in 
> ThreadCache don't have unit tests. Need to identify them and add unit test 
> cases for them.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10767) Add Unit Test cases for missing methods in ThreadCacheTest

2020-11-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10767:
--
Component/s: unit tests
 streams

> Add Unit Test cases for missing methods in ThreadCacheTest
> --
>
> Key: KAFKA-10767
> URL: https://issues.apache.org/jira/browse/KAFKA-10767
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> During the code review for KIP-614, it was noticed that some methods in 
> ThreadCache don't have unit tests. Need to identify them and add unit test 
> cases for them.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-11-30 Thread GitBox


cadonna commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r532451217



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {
+private final byte[] rawPrefix;
+
+RocksDBPrefixIterator(final String name,
+  final RocksIterator newIterator,
+  final Set> 
openIterators,
+  final Bytes prefix) {
+super(name, newIterator, openIterators, true);
+this.rawPrefix = prefix.get();
+newIterator.seek(rawPrefix);
+}
+
+private boolean prefixEquals(final byte[] prefix1, final byte[] prefix2) {
+final int min = Math.min(prefix1.length, prefix2.length);
+final ByteBuffer prefix1Slice = ByteBuffer.wrap(prefix1, 0, min);
+final ByteBuffer prefix2Slice = ByteBuffer.wrap(prefix2, 0, min);
+return prefix1Slice.equals(prefix2Slice);

Review comment:
   I see what you mean. But why do you bother then to find the minimum 
length of the two prefixes? You could just use the length of the first prefix.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9630: KAFKA-10739; Replace EpochEndOffset with automated protocol

2020-11-30 Thread GitBox


dajac commented on a change in pull request #9630:
URL: https://github.com/apache/kafka/pull/9630#discussion_r53221



##
File path: 
core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
##
@@ -78,7 +80,19 @@ class ReplicaFetcherMockBlockingSend(offsets: 
java.util.Map[TopicPartition, Epoc
 callback.foreach(_.apply())
 epochFetchCount += 1
 lastUsedOffsetForLeaderEpochVersion = 
requestBuilder.latestAllowedVersion()
-new OffsetsForLeaderEpochResponse(currentOffsets)
+
+val data = new OffsetForLeaderEpochResponseData()
+currentOffsets.forEach((tp, offsetForLeaderPartition) => {
+  var topic = data.topics.find(tp.topic)
+  if (topic == null) {
+topic = new OffsetForLeaderTopicResult()
+  .setTopic(tp.topic)
+data.topics.add(topic)
+  }
+  
topic.partitions.add(offsetForLeaderPartition.setPartition(tp.partition))

Review comment:
   Good point. Actually, `setPartition(tp.partition)` is not needed. Let me 
remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10766) Add Unit Test cases for RocksDbRangeIterator

2020-11-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10766:
--
Component/s: unit tests
 streams

> Add Unit Test cases for RocksDbRangeIterator
> 
>
> Key: KAFKA-10766
> URL: https://issues.apache.org/jira/browse/KAFKA-10766
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> During the code review for KIP-614, it was noticed that RocksDbRangeIterator 
> does not have any unit test cases. Here is the github comment for referrence:
> [https://github.com/apache/kafka/pull/9508#discussion_r527612942]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10766) Add Unit Test cases for RocksDbRangeIterator

2020-11-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10766:
--
Labels: newbie  (was: )

> Add Unit Test cases for RocksDbRangeIterator
> 
>
> Key: KAFKA-10766
> URL: https://issues.apache.org/jira/browse/KAFKA-10766
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie
>
> During the code review for KIP-614, it was noticed that RocksDbRangeIterator 
> does not have any unit test cases. Here is the github comment for referrence:
> [https://github.com/apache/kafka/pull/9508#discussion_r527612942]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-11-30 Thread GitBox


cadonna commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r532440942



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
##
@@ -359,6 +361,31 @@ public void shouldReverseIterateOverRange() {
 ), results);
 }
 
+@Test
+public void shouldGetRecordsWithPrefixKey() {
+final List> entries = new ArrayList<>();
+entries.add(new KeyValue<>(bytesKey("k1"), bytesValue("1")));
+entries.add(new KeyValue<>(bytesKey("k2"), bytesValue("2")));
+entries.add(new KeyValue<>(bytesKey("p2"), bytesValue("2")));
+entries.add(new KeyValue<>(bytesKey("p1"), bytesValue("2")));
+entries.add(new KeyValue<>(bytesKey("p0"), bytesValue("2")));
+store.putAll(entries);
+final KeyValueIterator keysWithPrefix = 
store.prefixScan("p", new StringSerializer());
+final List keys = new ArrayList<>();
+final List values = new ArrayList<>();
+int numberOfKeysReturned = 0;
+
+while (keysWithPrefix.hasNext()) {
+final KeyValue next = keysWithPrefix.next();
+keys.add(next.key.toString());
+values.add(new String(next.value));
+numberOfKeysReturned++;
+}
+assertThat(numberOfKeysReturned, is(3));
+assertThat(keys, is(Arrays.asList("p0", "p1", "p2")));
+assertThat(values, is(Arrays.asList("2", "2", "2")));
+}

Review comment:
   Creating a ticket and take care that the ticket gets resolved, sound 
good to me. As I wote, my request is not a requirement for approval.  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-11-30 Thread GitBox


cadonna commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r532438482



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -229,6 +230,15 @@ public V delete(final K key) {
 }
 }
 
+@Override
+public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+
+return new MeteredKeyValueIterator(
+wrapped().prefixScan(prefix, prefixKeySerializer),
+rangeSensor

Review comment:
   I think, to be consistent we should add a new sensor for the prefix 
scan. We have a sensor for each operation on a key-value store. The only 
exception is `reverseRange()`, which is a variant of range and does not 
necessarily qualify for its own sensor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org