[GitHub] [kafka] vgvineet4 commented on pull request #9254: KAFKA-10462: Added support to pass headers in producerPerformance script

2020-09-17 Thread GitBox


vgvineet4 commented on pull request #9254:
URL: https://github.com/apache/kafka/pull/9254#issuecomment-69400


   @wcarlson5 please have a look 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-09-17 Thread GitBox


omkreddy commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-694129604


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


dajac commented on pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#issuecomment-694131412


   Netty used a in-memory DNS server for such tests: 
https://github.com/netty/netty/blob/master/resolver-dns/src/test/java/io/netty/resolver/dns/TestDnsServer.java.
 I suppose that we could do something similar in AK as well.
   
   What about merging this one and filing a JIRA to improve this in the future? 
All these flaky tests due to this are quite annoying so we could already fix 
them. What do you think?
   
   It would be great if we could backport it to previous versions as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-17 Thread GitBox


cadonna commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r490101377



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import 
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({IntegrationTest.class})
+public class StreamsAssignmentScaleTest {
+final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual 
assignment should complete within 20s
+final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+private final Logger log = 
LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+/ HighAvailabilityTaskAssignor tests /
+
+@Test(timeout = 120 * 1000)
+public void testHighAvailabilityTaskAssignorLargePartitionCount() {
+completeLargeAssignment(6_000, 1, 1, 1, 
HighAvailabilityTaskAssignor.class);
+}
+
+@Test(timeout = 120 * 1000)
+public void testHighAvailabilityTaskAssignorLargeNumConsumers() {
+completeLargeAssignment(1_000, 1_000, 1, 1, 
HighAvailabilityTaskAssignor.class);
+}
+
+@Test(timeout = 120 * 1000)
+public void testHighAvailabilityTaskAssignorManyStandbys() {
+completeLargeAssignment(1_000, 100, 1, 50, 
HighAvailabilityTaskAssignor.class);
+}
+
+@Test(timeout = 120 * 1000)
+public void testHighAvailabilityTaskAssignorManyThreadsPerClient() {
+completeLargeAssignment(1_000, 10, 1000, 1, 
HighAvailabilityTaskAssignor.class);
+}
+
+/ StickyTaskAssignor tests /
+
+@Test(timeout = 120 * 1000)
+public voi

[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-17 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r490147517



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   OK, will flip.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-17 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r490152263



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   As discussed offline - change the behaviour of `enableSendingOldValues` 
such that a node that is already materialized will not ask upstream nodes to 
also send old values, is deemed out of scope of this PR.
   
   @mjsax has requested a ticket to track this work.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized

2020-09-17 Thread Andy Coates (Jira)
Andy Coates created KAFKA-10494:
---

 Summary: Streams: enableSendingOldValues should not call parent if 
node is itself materialized
 Key: KAFKA-10494
 URL: https://issues.apache.org/jira/browse/KAFKA-10494
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Andy Coates


Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

```

StreamsBuilder builder = new StreamsBuilder();

builder
   .table("t1", Consumed.of(...))
   .filter(predicate, Materialized.as("t2"))
   .

```

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.


This ticket was raised off the back of [comments in a 
PR](https://github.com/apache/kafka/pull/9156#discussion_r490152263) while 
working on https://issues.apache.org/jira/browse/KAFKA-10077.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized

2020-09-17 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-10494:

Description: 
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

```

StreamsBuilder builder = new StreamsBuilder();

builder
    .table("t1", Consumed.of(...))
    .filter(predicate, Materialized.as("t2"))
    .

```

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|[https://github.com/apache/kafka/pull/9156#discussion_r490152263]|https://github.com/apache/kafka/pull/9156#discussion_r490152263]
 while working on https://issues.apache.org/jira/browse/KAFKA-10077.

  was:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

```

StreamsBuilder builder = new StreamsBuilder();

builder
   .table("t1", Consumed.of(...))
   .filter(predicate, Materialized.as("t2"))
   .

```

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.


This ticket was raised off the back of [comments in a 
PR](https://github.com/apache/kafka/pull/9156#discussion_r490152263) while 
working on https://issues.apache.org/jira/browse/KAFKA-10077.


> Streams: enableSendingOldValues should not call parent if node is itself 
> materialized
> -
>
> Key: KAFKA-10494
> URL: https://issues.apache.org/jira/browse/KAFKA-10494
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Andy Coates
>Priority: Major
>
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
> unnecessarily calling `enableSendingOldValues` on the parent, even when the 
> processor itself is materialized. This can force the parent table to be 
> materialized unnecessarily.
>  
> For example:
> ```
> StreamsBuilder builder = new StreamsBuilder();
> builder
>     .table("t1", Consumed.of(...))
>     .filter(predicate, Materialized.as("t2"))
>     .
> ```
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
> returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
> materialized unnecessarily.
> This ticket was raised off the back of [comments in a 
> PR|[https://github.com/apache/kafka/pull/9156#discussion_r490152263]|https://github.com/apache/kafka/pull/9156#discussion_r490152263]
>  while working on https://issues.apache.org/jira/browse/KAFKA-10077.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-17 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r490152263



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   As discussed offline - change the behaviour of `enableSendingOldValues` 
such that a node that is already materialized will not ask upstream nodes to 
also send old values, is deemed out of scope of this PR.
   
   @mjsax has requested a ticket to track this work: 
https://issues.apache.org/jira/browse/KAFKA-10494





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized

2020-09-17 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-10494:

Description: 
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

```

StreamsBuilder builder = new StreamsBuilder();

builder
    .table("t1", Consumed.of(...))
    .filter(predicate, Materialized.as("t2"))
    .

```

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on 
[KAFKA-10077|https://issues.apache.org/jira/browse/KAFKA-10077].

  was:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

```

StreamsBuilder builder = new StreamsBuilder();

builder
    .table("t1", Consumed.of(...))
    .filter(predicate, Materialized.as("t2"))
    .

```

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|[https://github.com/apache/kafka/pull/9156#discussion_r490152263]|https://github.com/apache/kafka/pull/9156#discussion_r490152263]
 while working on https://issues.apache.org/jira/browse/KAFKA-10077.


> Streams: enableSendingOldValues should not call parent if node is itself 
> materialized
> -
>
> Key: KAFKA-10494
> URL: https://issues.apache.org/jira/browse/KAFKA-10494
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Andy Coates
>Priority: Major
>
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
> unnecessarily calling `enableSendingOldValues` on the parent, even when the 
> processor itself is materialized. This can force the parent table to be 
> materialized unnecessarily.
>  
> For example:
> ```
> StreamsBuilder builder = new StreamsBuilder();
> builder
>     .table("t1", Consumed.of(...))
>     .filter(predicate, Materialized.as("t2"))
>     .
> ```
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
> returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
> materialized unnecessarily.
> This ticket was raised off the back of [comments in a 
> PR|#discussion_r490152263]] while working on 
> [KAFKA-10077|https://issues.apache.org/jira/browse/KAFKA-10077].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-17 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487060524



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
 doTestNotSendingOldValue(builder, table1, table2, topic1);
 }
 
+@Test
+public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+final StreamsBuilder builder = new StreamsBuilder();
+final String topic1 = "topic1";
+
+final KTableImpl table1 =
+(KTableImpl) builder.table(topic1, 
consumed);
+final KTableImpl table2 = 
(KTableImpl) table1.filter(predicate);
+
+table2.enableSendingOldValues(true);

Review comment:
   See [my comment 
above](https://github.com/apache/kafka/pull/9156#discussion_r487057805)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-17 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r490154341



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   Anyway, done...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10488) Adding cluster with ssl to kafka-manager

2020-09-17 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197622#comment-17197622
 ] 

Mickael Maison commented on KAFKA-10488:


Hi [~vodevops],
This looks like a question about CMAK (previously kafka-manager). This JIRA is 
for Apache Kafka. I recommend asking your question to the CMAK project on 
https://github.com/yahoo/CMAK

> Adding cluster with ssl to kafka-manager
> 
>
> Key: KAFKA-10488
> URL: https://issues.apache.org/jira/browse/KAFKA-10488
> Project: Kafka
>  Issue Type: Bug
>Reporter: vodevops
>Priority: Major
>  Labels: build, kafka-manager
>
> I'm trying to add cluster in kafka which is ssl enabled with port 9093.
>  below is the issue i'm facing
> '''
>  yikes! ask timed out on 
> [actorselection[anchor(akka://kafka-manager-system/), 
> path(/user/kafka-manager/q2/kafka-state)]] after [2000 ms]. message of type 
> [kafka.manager.model.actormodel$ksgetbrokers$] was sent by 
> [actor[akka://kafka-manager-system/user/kafka-manager/test#-20444736]|#-20444736]].
>  a typical reason for `asktimeoutexception` is that the recipient actor 
> didn't send a reply.
> ```
>  I tried by creating as zookeeper host and also 
> 
>  Both give the same above error.
>  How to add the server in kafka-manager which is ssl enabled?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized

2020-09-17 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-10494:

Description: 
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

```

StreamsBuilder builder = new StreamsBuilder();

builder
    .table("t1", Consumed.of(...))
    .filter(predicate, Materialized.as("t2"))
    .

```

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

 

A good test that highlights this would be to add this to `KTableFilterTest`:

```

@Test
public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {
  final StreamsBuilder builder = new StreamsBuilder();
  final String topic1 = "topic1";

  final KTableImpl table1 =
     (KTableImpl) builder.table(topic1, consumed);
  final KTableImpl table2 =
     (KTableImpl) table1.filter(predicate, 
Materialized.as("store2"));

  table2.enableSendingOldValues(false);

  doTestSendingOldValue(builder, table1, table2, topic1);
}
```

  was:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

```

StreamsBuilder builder = new StreamsBuilder();

builder
    .table("t1", Consumed.of(...))
    .filter(predicate, Materialized.as("t2"))
    .

```

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on 
[KAFKA-10077|https://issues.apache.org/jira/browse/KAFKA-10077].


> Streams: enableSendingOldValues should not call parent if node is itself 
> materialized
> -
>
> Key: KAFKA-10494
> URL: https://issues.apache.org/jira/browse/KAFKA-10494
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Andy Coates
>Priority: Major
>
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
> unnecessarily calling `enableSendingOldValues` on the parent, even when the 
> processor itself is materialized. This can force the parent table to be 
> materialized unnecessarily.
>  
> For example:
> ```
> StreamsBuilder builder = new StreamsBuilder();
> builder
>     .table("t1", Consumed.of(...))
>     .filter(predicate, Materialized.as("t2"))
>     .
> ```
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
> returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
> materialized unnecessarily.
> This ticket was raised off the back of [comments in a 
> PR|#discussion_r490152263]] while working on KAFKA-10077.
>  
> A good test that highlights this would be to add this to `KTableFilterTest`:
> ```
> @Test
> public void 
> shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
>  {
>   final StreamsBuilder builder = new StreamsBuilder();
>   final String topic1 = "topic1";
>   final KTableImpl table1 =
>      (KTableImpl) builder.table(topic1, consumed);
>   final KTableImpl table2 =
>      (KTableImpl) table1.filter(predicate, 
> Materialized.as("store2"));
>   table2.enableSendingOldValues(false);
>   doTestSendingOldValue(builder, table1, table2, topic1);
> }
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized

2020-09-17 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-10494:

Description: 
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

{{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
{{    .table("t1", Consumed.of(...))}}
{{    .filter(predicate, Materialized.as("t2"))}}
{{    .}}

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

A good test that highlights this would be to add this to `KTableFilterTest`:

{{@Test}}
{{public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {}}
{{  final StreamsBuilder builder = new StreamsBuilder();}}
{{  final String topic1 = "topic1";}}

{{  final KTableImpl table1 =}}
{{     (KTableImpl) builder.table(topic1, consumed);}}
{{  final KTableImpl table2 =}}
{{     (KTableImpl) table1.filter(predicate, 
Materialized.as("store2"));}}

{{  table2.enableSendingOldValues(false);}}

{{  doTestSendingOldValue(builder, table1, table2, topic1);}}
{{}}}

Though this problem is not restricted to the filter call. Other processor 
suppliers suffer from the same issue.

 

  was:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

```

StreamsBuilder builder = new StreamsBuilder();

builder
    .table("t1", Consumed.of(...))
    .filter(predicate, Materialized.as("t2"))
    .

```

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

 

A good test that highlights this would be to add this to `KTableFilterTest`:

```

@Test
public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {
  final StreamsBuilder builder = new StreamsBuilder();
  final String topic1 = "topic1";

  final KTableImpl table1 =
     (KTableImpl) builder.table(topic1, consumed);
  final KTableImpl table2 =
     (KTableImpl) table1.filter(predicate, 
Materialized.as("store2"));

  table2.enableSendingOldValues(false);

  doTestSendingOldValue(builder, table1, table2, topic1);
}
```


> Streams: enableSendingOldValues should not call parent if node is itself 
> materialized
> -
>
> Key: KAFKA-10494
> URL: https://issues.apache.org/jira/browse/KAFKA-10494
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Andy Coates
>Priority: Major
>
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
> unnecessarily calling `enableSendingOldValues` on the parent, even when the 
> processor itself is materialized. This can force the parent table to be 
> materialized unnecessarily.
>  
> For example:
> {{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
> {{    .table("t1", Consumed.of(...))}}
> {{    .filter(predicate, Materialized.as("t2"))}}
> {{    .}}
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
> returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
> materialized unnecessarily.
> This ticket was raised off the back of [comments in a 
> PR|#discussion_r490152263]] while working on KAFKA-10077.
> A good test that highlights this would be to add this to `KTableFilterTest`:
> {{@Test}}
> {{public void 
> shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
>  {}}
> {{  final StreamsBuilder builder = new StreamsBuilder();}}
> {{  final String topic1 = "topic1";}}
> {{  final KTableImpl table1 =}}
> {{     (KTableImpl) builder.table(topic1, 
> consumed);}}
> {{  final KTableImpl table2 =}}
> {{     (KTableImpl) table1.filter(predicate, 
> Materialized.as("store2"));}}
> {{  table2.enableSendingOldValues(false);}}
> {{  doTestSendingOldValue(builder, table1, table2, topic1);}}
> {{}}}
> Though this problem is not restricted to the filter call. Other processor 
> suppliers suffer from the same issue.
>  



--
This message was sent by Atlassian Jira

[jira] [Created] (KAFKA-10495) Fix spelling mistake

2020-09-17 Thread Jira
欧阳能达 created KAFKA-10495:


 Summary: Fix spelling mistake
 Key: KAFKA-10495
 URL: https://issues.apache.org/jira/browse/KAFKA-10495
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: 欧阳能达


In track branch.

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java
 465 line: @return true *iff* the operation succeeded

The *iff* is a mistake word.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-17 Thread GitBox


big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-694203001


   @mjsax - ready for another review.
   
   - boolean `onlyIfMaterialized` is now flipped to `forceMaterialization`.
   - `KTableFilter` constructor initialized internal `sendOldValues` flag based 
on parent tables state. (Not perfect, but works).
   - Removed test that was testing bad behaviour.
   
   The only outstanding piece, as I see it, is the bad behaviour if you call 
`enableSendingOldValues` without forcing materialization on a table that is 
itself materialized, but who's upstream is not.  In such a situation, the table 
will _not_ enable sending old values. 
   
   A similar bug existed before this PR:  if you called 
`enableSendingOldValues` on a table that is itself materialized, but who's 
upstream is not, then it will force materialization on its upstream table, 
which is unnecessary.
   
   With the new code, both of the bad behaviours are true, i.e. the change 
introduces the first one and the second one is still happening.  
   
   I've raised https://issues.apache.org/jira/browse/KAFKA-10494 to track this.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized

2020-09-17 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-10494:

Description: 
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

{{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
 \{{    .table("t1", Consumed.of(...))}}
 \{{    .filter(predicate, Materialized.as("t2"))}}
 \{{    .}}

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

A good test that highlights this would be to add this to `KTableFilterTest`:

{{@Test}}
 {{public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {}}
 {{  final StreamsBuilder builder = new StreamsBuilder();}}
 {{  final String topic1 = "topic1";}}

{{  final KTableImpl table1 =}}
 {{     (KTableImpl) builder.table(topic1, 
consumed);}}
 {{  final KTableImpl table2 =}}
 {{     (KTableImpl) table1.filter(predicate, 
Materialized.as("store2"));}}

{{  table2.enableSendingOldValues(false);}}

{{  doTestSendingOldValue(builder, table1, table2, topic1);}}
 {{}}}

Though this problem is not restricted to the filter call. Other processor 
suppliers suffer from the same issue.

In addition, once [https://github.com/apache/kafka/pull/9156] is merged,  if 
you call {{enableSendingOldValues}} without forcing materialization on a table 
that is itself materialized, but who's upstream is not. In such a situation, 
the table will _not_ enable sending old values, but should.

 

  was:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

{{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
{{    .table("t1", Consumed.of(...))}}
{{    .filter(predicate, Materialized.as("t2"))}}
{{    .}}

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

A good test that highlights this would be to add this to `KTableFilterTest`:

{{@Test}}
{{public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {}}
{{  final StreamsBuilder builder = new StreamsBuilder();}}
{{  final String topic1 = "topic1";}}

{{  final KTableImpl table1 =}}
{{     (KTableImpl) builder.table(topic1, consumed);}}
{{  final KTableImpl table2 =}}
{{     (KTableImpl) table1.filter(predicate, 
Materialized.as("store2"));}}

{{  table2.enableSendingOldValues(false);}}

{{  doTestSendingOldValue(builder, table1, table2, topic1);}}
{{}}}

Though this problem is not restricted to the filter call. Other processor 
suppliers suffer from the same issue.

 


> Streams: enableSendingOldValues should not call parent if node is itself 
> materialized
> -
>
> Key: KAFKA-10494
> URL: https://issues.apache.org/jira/browse/KAFKA-10494
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Andy Coates
>Priority: Major
>
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
> unnecessarily calling `enableSendingOldValues` on the parent, even when the 
> processor itself is materialized. This can force the parent table to be 
> materialized unnecessarily.
>  
> For example:
> {{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
>  \{{    .table("t1", Consumed.of(...))}}
>  \{{    .filter(predicate, Materialized.as("t2"))}}
>  \{{    .}}
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
> returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
> materialized unnecessarily.
> This ticket was raised off the back of [comments in a 
> PR|#discussion_r490152263]] while working on KAFKA-10077.
> A good test that highlights this would be to add this to `KTableFilterTest`:
> {{@Test}}
>  {{public void 
> shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
>  {}}
>  {{  final StreamsBuilder builder = new StreamsBuilder();}}
>  {{  final String topic1 = "topic1";}}
> {{  final KTa

[jira] [Updated] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized

2020-09-17 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-10494:

Description: 
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

{{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
    .table("t1", Consumed.of(...))
    .filter(predicate, Materialized.as("t2"))
    .

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

A good test that highlights this would be to add this to `KTableFilterTest`:

{{@Test}}
 {{public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {}}
 {{  final StreamsBuilder builder = new StreamsBuilder();}}
 {{  final String topic1 = "topic1";}}

{{  final KTableImpl table1 =}}
 {{     (KTableImpl) builder.table(topic1, 
consumed);}}
 {{  final KTableImpl table2 =}}
 {{     (KTableImpl) table1.filter(predicate, 
Materialized.as("store2"));}}

{{  table2.enableSendingOldValues(false);}}

{{  doTestSendingOldValue(builder, table1, table2, topic1);}}
 {{}}}

Though this problem is not restricted to the filter call. Other processor 
suppliers suffer from the same issue.

In addition, once [https://github.com/apache/kafka/pull/9156] is merged,  if 
you call {{enableSendingOldValues}} without forcing materialization on a table 
that is itself materialized, but who's upstream is not. In such a situation, 
the table will _not_ enable sending old values, but should.

 

  was:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

{{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
 \{{    .table("t1", Consumed.of(...))}}
 \{{    .filter(predicate, Materialized.as("t2"))}}
 \{{    .}}

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

A good test that highlights this would be to add this to `KTableFilterTest`:

{{@Test}}
 {{public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {}}
 {{  final StreamsBuilder builder = new StreamsBuilder();}}
 {{  final String topic1 = "topic1";}}

{{  final KTableImpl table1 =}}
 {{     (KTableImpl) builder.table(topic1, 
consumed);}}
 {{  final KTableImpl table2 =}}
 {{     (KTableImpl) table1.filter(predicate, 
Materialized.as("store2"));}}

{{  table2.enableSendingOldValues(false);}}

{{  doTestSendingOldValue(builder, table1, table2, topic1);}}
 {{}}}

Though this problem is not restricted to the filter call. Other processor 
suppliers suffer from the same issue.

In addition, once [https://github.com/apache/kafka/pull/9156] is merged,  if 
you call {{enableSendingOldValues}} without forcing materialization on a table 
that is itself materialized, but who's upstream is not. In such a situation, 
the table will _not_ enable sending old values, but should.

 


> Streams: enableSendingOldValues should not call parent if node is itself 
> materialized
> -
>
> Key: KAFKA-10494
> URL: https://issues.apache.org/jira/browse/KAFKA-10494
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Andy Coates
>Priority: Major
>
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
> unnecessarily calling `enableSendingOldValues` on the parent, even when the 
> processor itself is materialized. This can force the parent table to be 
> materialized unnecessarily.
>  
> For example:
> {{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
>     .table("t1", Consumed.of(...))
>     .filter(predicate, Materialized.as("t2"))
>     .
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
> returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
> materialized unnecessarily.
> This ticket was raised off the back of [comments in a 
> PR|#discussion_r490152263]] while working on KAFKA-10077.
> A good test that highlights this would be to add this to `

[GitHub] [kafka] mumrah commented on pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


mumrah commented on pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#issuecomment-694216516


   Agreed with @dajac's suggestion. Maybe for this change we could have the 
assertion as > 1 rather than equalling a specific value



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized

2020-09-17 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-10494:

Description: 
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

For example:
{code:java}
StreamsBuilder builder = new StreamsBuilder();builder
     .table("t1", Consumed.of(...))
     .filter(predicate, Materialized.as("t2"))
     .
{code}
If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

A good test that highlights this would be to add this to `KTableFilterTest`:
{code:java}
@Test
 public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {
   final StreamsBuilder builder = new StreamsBuilder();
   final String topic1 = "topic1";
  final KTableImpl table1 =
      (KTableImpl) builder.table(topic1, consumed);
   final KTableImpl table2 =
      (KTableImpl) table1.filter(predicate, 
Materialized.as("store2"));
  table2.enableSendingOldValues(false);
  doTestSendingOldValue(builder, table1, table2, topic1);
 }
{code}
Though this problem is not restricted to the filter call. Other processor 
suppliers suffer from the same issue.

In addition, once [https://github.com/apache/kafka/pull/9156] is merged,  if 
you call {{enableSendingOldValues}} without forcing materialization on a table 
that is itself materialized, but who's upstream is not. In such a situation, 
the table will _not_ enable sending old values, but should.

 

  was:
Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

{{StreamsBuilder builder = new StreamsBuilder();}}{{builder}}
    .table("t1", Consumed.of(...))
    .filter(predicate, Materialized.as("t2"))
    .

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.

This ticket was raised off the back of [comments in a 
PR|#discussion_r490152263]] while working on KAFKA-10077.

A good test that highlights this would be to add this to `KTableFilterTest`:

{{@Test}}
 {{public void 
shouldEnableSendOldValuesIfSourceTableNotMaterializedButFinalTableIsEvenIfNotForcedToMaterialize()
 {}}
 {{  final StreamsBuilder builder = new StreamsBuilder();}}
 {{  final String topic1 = "topic1";}}

{{  final KTableImpl table1 =}}
 {{     (KTableImpl) builder.table(topic1, 
consumed);}}
 {{  final KTableImpl table2 =}}
 {{     (KTableImpl) table1.filter(predicate, 
Materialized.as("store2"));}}

{{  table2.enableSendingOldValues(false);}}

{{  doTestSendingOldValue(builder, table1, table2, topic1);}}
 {{}}}

Though this problem is not restricted to the filter call. Other processor 
suppliers suffer from the same issue.

In addition, once [https://github.com/apache/kafka/pull/9156] is merged,  if 
you call {{enableSendingOldValues}} without forcing materialization on a table 
that is itself materialized, but who's upstream is not. In such a situation, 
the table will _not_ enable sending old values, but should.

 


> Streams: enableSendingOldValues should not call parent if node is itself 
> materialized
> -
>
> Key: KAFKA-10494
> URL: https://issues.apache.org/jira/browse/KAFKA-10494
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Andy Coates
>Priority: Major
>
> Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
> unnecessarily calling `enableSendingOldValues` on the parent, even when the 
> processor itself is materialized. This can force the parent table to be 
> materialized unnecessarily.
> For example:
> {code:java}
> StreamsBuilder builder = new StreamsBuilder();builder
>      .table("t1", Consumed.of(...))
>      .filter(predicate, Materialized.as("t2"))
>      .
> {code}
> If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
> returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
> materialized unnecessarily.
> This ticket was raised off the back of [comments in a 
> PR|#discussion_r490152263]] while working on KAFKA-10077.
> A good test that highlights this would be to add this to `KTableFilterTest`:
> {code

[GitHub] [kafka] ouyangnengda opened a new pull request #9297: KAFKA-10495:Fix spelling mistake

2020-09-17 Thread GitBox


ouyangnengda opened a new pull request #9297:
URL: https://github.com/apache/kafka/pull/9297


   The iff is wrong word.I fixed it to if.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #9298: MINOR: Replace Java 14 with Java 15 in the README

2020-09-17 Thread GitBox


ijuma opened a new pull request #9298:
URL: https://github.com/apache/kafka/pull/9298


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-09-17 Thread GitBox


ijuma commented on a change in pull request #9223:
URL: https://github.com/apache/kafka/pull/9223#discussion_r490246634



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchBenchmarkSuite.java
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.record;
+
+import kafka.server.BrokerTopicStats;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE;
+
+@State(Scope.Benchmark)
+public abstract class RecordBatchBenchmarkSuite {

Review comment:
   Maybe this should be `BaseRecordBatchBenchmark`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-17 Thread GitBox


ijuma commented on a change in pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#discussion_r490256227



##
File path: core/src/test/scala/kafka/utils/LoggingTest.scala
##
@@ -58,4 +60,20 @@ class LoggingTest extends Logging {
 
 assertEquals(logging.getClass.getName, logging.log.underlying.getName)
   }
+
+  @Test
+  def testLoggerLevelIsResolved(): Unit = {
+val controller = new Log4jController()
+val previousLevel = controller.getLogLevel("kafka")
+try {
+  controller.setLogLevel("kafka", "TRACE")
+  
Logger(LoggerFactory.getLogger("kafka.utils.Log4jControllerTest")).trace("test")

Review comment:
   Can we add a comment explaining why we are invoking the logger here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #9297: KAFKA-10495:Fix spelling mistake

2020-09-17 Thread GitBox


dongjinleekr commented on pull request #9297:
URL: https://github.com/apache/kafka/pull/9297#issuecomment-694245811


   Hi @ouyangnengda,
   
   I think it is not a problem. 'iff' means 'if and only if'. Please see 
[here](https://www.oxfordlearnersdictionaries.com/definition/english/iff).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10495) Fix spelling mistake

2020-09-17 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197699#comment-17197699
 ] 

Luke Chen commented on KAFKA-10495:
---

*if and only if* (shortened as *iff*) , fyi

[https://en.wikipedia.org/wiki/If_and_only_if]

 

> Fix spelling mistake
> 
>
> Key: KAFKA-10495
> URL: https://issues.apache.org/jira/browse/KAFKA-10495
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: 欧阳能达
>Priority: Trivial
>  Labels: newbie
>
> In track branch.
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java
>  465 line: @return true *iff* the operation succeeded
> The *iff* is a mistake word.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-17 Thread GitBox


ijuma commented on a change in pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#discussion_r490259458



##
File path: core/src/main/scala/kafka/utils/Log4jController.scala
##
@@ -29,6 +29,24 @@ import scala.jdk.CollectionConverters._
 object Log4jController {
   val ROOT_LOGGER = "root"
 
+  def resolveLevel(logger: Logger): String = {

Review comment:
   Should this be private?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-17 Thread GitBox


ijuma commented on a change in pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#discussion_r490260948



##
File path: core/src/test/scala/kafka/utils/LoggingTest.scala
##
@@ -58,4 +60,22 @@ class LoggingTest extends Logging {
 
 assertEquals(logging.getClass.getName, logging.log.underlying.getName)
   }
+
+  @Test
+  def testLoggerLevelIsResolved(): Unit = {
+val controller = new Log4jController()
+val previousLevel = controller.getLogLevel("kafka")
+try {
+  controller.setLogLevel("kafka", "TRACE")
+  // Do some logging so that the Logger is created within the hierarchy
+  // (until loggers are used only loggers in the config file exist)
+  
Logger(LoggerFactory.getLogger("kafka.utils.Log4jControllerTest")).trace("test")
+  Assert.assertEquals("TRACE", controller.getLogLevel("kafka"))
+  Assert.assertEquals("TRACE", 
controller.getLogLevel("kafka.utils.Log4jControllerTest"))
+  Assert.assertTrue(controller.getLoggers.contains("kafka=TRACE"))
+  
Assert.assertTrue(controller.getLoggers.contains("kafka.utils.Log4jControllerTest=TRACE"))

Review comment:
   Nit: we don't usually include the `Assert.` prefix in test assertions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-17 Thread GitBox


ijuma commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r490262817



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -1139,6 +1139,7 @@ object GroupMetadataManager {
 
   private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
GROUP_VALUE_SCHEMAS.size - 1

Review comment:
   This would be more robust if we did something like:
   
   ```scala
   GROUP_VALUE_SCHEMAS.keySet.max
   
   
   Or something along those lines.

##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##
@@ -931,6 +932,44 @@ class GroupMetadataManagerTest {
 assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+val generation = 1
+val protocol = "range"
+val memberId = "memberId"
+val unsupportedVersion = Short.MinValue
+
+// put the unsupported version as the version value
+val groupMetadataRecordValue = 
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+  .value().putShort(unsupportedVersion)
+// reset the position to the starting position 0 so that it can read the 
data in correct order
+groupMetadataRecordValue.position(0)
+
+val e = assertThrows(classOf[KafkaException],
+  () => GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecordValue, time))
+assertEquals(s"Unknown group metadata version ${unsupportedVersion}", 
e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
+val generation = 1
+val protocol = "range"
+val memberId = "memberId"
+
+for (apiVersion <- ApiVersion.allVersions) {
+  val groupMetadataRecord = buildStableGroupRecordWithMember(generation, 
protocolType, protocol, memberId, apiVersion = apiVersion)
+
+  val deserializedGroupMetadata = 
GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecord.value(), time)
+  // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the 
currentStateTimestamp
+  if (apiVersion >= KAFKA_2_1_IV0)
+assertEquals(s"the apiVersion $apiVersion doesn't set the 
currentStateTimestamp correctly.",
+  time.milliseconds(), 
deserializedGroupMetadata.currentStateTimestamp.getOrElse(-1))

Review comment:
   I think it would be better to have the expected side be 
`Some(time.milliseconds())`, then you don't need the `getOrElse` on the actual 
side.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-17 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-694255418


   Many thanks @ijuma all fixed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

2020-09-17 Thread GitBox


cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r490189904



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -306,40 +306,55 @@ public synchronized void clean() {
  */
 public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
 try {
-cleanRemovedTasks(cleanupDelayMs, false);
+cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
 } catch (final Exception cannotHappen) {
 throw new IllegalStateException("Should have swallowed 
exception.", cannotHappen);
 }
 }
 
-private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-final boolean manualUserCall) 
throws Exception {
-final File[] taskDirs = listAllTaskDirectories();
-if (taskDirs == null || taskDirs.length == 0) {
-return; // nothing to do
-}
-
-for (final File taskDir : taskDirs) {
+private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs) {
+for (final File taskDir : listAllTaskDirectories()) {
 final String dirName = taskDir.getName();
 final TaskId id = TaskId.parse(dirName);
 if (!locks.containsKey(id)) {
-Exception exception = null;
 try {
 if (lock(id)) {
 final long now = time.milliseconds();
 final long lastModifiedMs = taskDir.lastModified();
 if (now > lastModifiedMs + cleanupDelayMs) {
 log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
 logPrefix(), dirName, id, now - 
lastModifiedMs, cleanupDelayMs);
-
-Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-} else if (manualUserCall) {
-log.info("{} Deleting state directory {} for task 
{} as user calling cleanup.",
-logPrefix(), dirName, id);
-
 Utils.delete(taskDir, 
Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
 }
 }
+} catch (final OverlappingFileLockException | IOException e) {
+log.warn("{} Swallowed the following exception during 
deletion of obsolete state directory {} for task {}: {}",
+logPrefix(), dirName, id, e);
+} finally {
+try {
+unlock(id);
+} catch (final IOException e) {
+log.warn("{} Swallowed the following exception during 
unlocking after " +
+"deletion of obsolete state directory for task 
{}: {}",
+logPrefix(), dirName, e);
+}
+}
+}
+}
+}
+
+private void cleanRemovedTasksCalledByUser() throws Exception {
+for (final File taskDir : listAllTaskDirectories()) {
+final String dirName = taskDir.getName();
+final TaskId id = TaskId.parse(dirName);
+if (!locks.containsKey(id)) {
+Exception exception = null;
+try {
+if (lock(id)) {
+log.info("{} Deleting state directory {} for task {} 
as user calling cleanup.",
+logPrefix(), dirName, id);
+Utils.delete(taskDir, Collections.singletonList(new 
File(taskDir, LOCK_FILE_NAME)));
+}
 } catch (final OverlappingFileLockException | IOException e) {

Review comment:
   Good observation! I applied some of your suggestions but left the 
`try-catch` statements since they allow to log more specific information about 
the encountered error.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -280,7 +280,7 @@ synchronized void unlock(final TaskId taskId) throws 
IOException {
 public synchronized void clean() {
 // remove task dirs
 try {
-cleanRemovedTasks(0, true);
+cleanRemovedTasksCalledByUser();
 } catch (final Exception e) {
 // this is already logged within cleanRemovedTasks

Review comment:
   I am afraid I do not understand this comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at

[GitHub] [kafka] showuon commented on a change in pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-17 Thread GitBox


showuon commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r490283148



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -1139,6 +1139,7 @@ object GroupMetadataManager {
 
   private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
GROUP_VALUE_SCHEMAS.size - 1

Review comment:
   good suggestion! Thanks.

##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##
@@ -931,6 +932,44 @@ class GroupMetadataManagerTest {
 assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+val generation = 1
+val protocol = "range"
+val memberId = "memberId"
+val unsupportedVersion = Short.MinValue
+
+// put the unsupported version as the version value
+val groupMetadataRecordValue = 
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+  .value().putShort(unsupportedVersion)
+// reset the position to the starting position 0 so that it can read the 
data in correct order
+groupMetadataRecordValue.position(0)
+
+val e = assertThrows(classOf[KafkaException],
+  () => GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecordValue, time))
+assertEquals(s"Unknown group metadata version ${unsupportedVersion}", 
e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
+val generation = 1
+val protocol = "range"
+val memberId = "memberId"
+
+for (apiVersion <- ApiVersion.allVersions) {
+  val groupMetadataRecord = buildStableGroupRecordWithMember(generation, 
protocolType, protocol, memberId, apiVersion = apiVersion)
+
+  val deserializedGroupMetadata = 
GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecord.value(), time)
+  // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the 
currentStateTimestamp
+  if (apiVersion >= KAFKA_2_1_IV0)
+assertEquals(s"the apiVersion $apiVersion doesn't set the 
currentStateTimestamp correctly.",
+  time.milliseconds(), 
deserializedGroupMetadata.currentStateTimestamp.getOrElse(-1))

Review comment:
   Good suggestion! Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-17 Thread GitBox


showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-694267556


   @ijuma , thanks for the comments. I've updated in this commit: 
https://github.com/apache/kafka/pull/9202/commits/97fe0a53d450d103e25dae0669c3ac803d7afe40.
 Thank you!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10313) Out of range offset errors leading to offset reset

2020-09-17 Thread Varsha Abhinandan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197722#comment-17197722
 ] 

Varsha Abhinandan edited comment on KAFKA-10313 at 9/17/20, 2:26 PM:
-

The issue mentioned in KAFKA-9543 seems to coincide with the segment rollover 
and also post 2.4.0 version. Unfortunately, we are facing this issue in 2.2.2 
version and according to the logs it's not around the same time as segment 
rollover. 


was (Author: varsha.abhinandan):
The issue mentioned in KAFKA-9543 seems to coincide with the segment rollover 
and also post 2.4.0 version. Unfortunately, we are facing this issue in 2.2.2 
version and according to the logs it's not around the time of segment rollover. 

> Out of range offset errors leading to offset reset
> --
>
> Key: KAFKA-10313
> URL: https://issues.apache.org/jira/browse/KAFKA-10313
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2
>Reporter: Varsha Abhinandan
>Priority: Major
>
> Hi,
>   
>  We have been occasionally noticing offset resets happening on the Kafka 
> consumer because of offset out of range error. However, I don't see any 
> errors in the broker logs. No logs related to leader-election, replica lag, 
> Kafka broker pod restarts or anything. (just info logs were enabled in the 
> prod environment).
>   
>  It appeared from the logs that the out of range error was because of the 
> fetch offset being larger than the offset range on the broker. Noticed this 
> happening multiple times on different consumers, stream apps in the prod 
> environment. So, it doesn't seem like an application bug and more like a bug 
> in the KafkaConsumer. Would like to understand the cause for such errors.
>   
>  Also, none of the offset reset options are desirable. Choosing "earliest" 
> creates a sudden huge lag (we have a retention of 24hours) and choosing 
> "latest" leads to data loss (the records produced between the out of range 
> error and when offset reset happens on the consumer). So, wondering if it is 
> better for the Kafka client to separate out 'auto.offset.reset' config for 
> just offset not found. For, out of range error maybe the Kafka client can 
> automatically reset the offset to latest if the fetch offset is higher to 
> prevent data loss. Also, automatically reset it to earliest if the fetch 
> offset is lesser than the start offset. 
>   
>  Following are the logs on the consumer side :
> {noformat}
> [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range 
> for partition prd453-19-event-upsert-32, resetting offset
> [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Resetting offset for partition 
> prd453-19-event-upsert-32 to offset 453223789.
>   {noformat}
> Broker logs for the partition :
> {noformat}
> [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable 
> segments with base offsets [452091893] due to retention time 8640ms breach
>  [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log 
> segment [baseOffset 452091893, size 1073741693] for deletion.
>  [2020-07-17T07:40:12,083Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log 
> start offset to 453223789
>  [2020-07-17T07:41:12,083Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 
> 452091893
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted log 
> /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted offset index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted time index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted.
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-r

[jira] [Commented] (KAFKA-10313) Out of range offset errors leading to offset reset

2020-09-17 Thread Varsha Abhinandan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197722#comment-17197722
 ] 

Varsha Abhinandan commented on KAFKA-10313:
---

The issue mentioned in KAFKA-9543 seems to coincide with the segment rollover 
and also post 2.4.0 version. Unfortunately, we are facing this issue in 2.2.2 
version and according to the logs it's not around the time of segment rollover. 

> Out of range offset errors leading to offset reset
> --
>
> Key: KAFKA-10313
> URL: https://issues.apache.org/jira/browse/KAFKA-10313
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.2.2
>Reporter: Varsha Abhinandan
>Priority: Major
>
> Hi,
>   
>  We have been occasionally noticing offset resets happening on the Kafka 
> consumer because of offset out of range error. However, I don't see any 
> errors in the broker logs. No logs related to leader-election, replica lag, 
> Kafka broker pod restarts or anything. (just info logs were enabled in the 
> prod environment).
>   
>  It appeared from the logs that the out of range error was because of the 
> fetch offset being larger than the offset range on the broker. Noticed this 
> happening multiple times on different consumers, stream apps in the prod 
> environment. So, it doesn't seem like an application bug and more like a bug 
> in the KafkaConsumer. Would like to understand the cause for such errors.
>   
>  Also, none of the offset reset options are desirable. Choosing "earliest" 
> creates a sudden huge lag (we have a retention of 24hours) and choosing 
> "latest" leads to data loss (the records produced between the out of range 
> error and when offset reset happens on the consumer). So, wondering if it is 
> better for the Kafka client to separate out 'auto.offset.reset' config for 
> just offset not found. For, out of range error maybe the Kafka client can 
> automatically reset the offset to latest if the fetch offset is higher to 
> prevent data loss. Also, automatically reset it to earliest if the fetch 
> offset is lesser than the start offset. 
>   
>  Following are the logs on the consumer side :
> {noformat}
> [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range 
> for partition prd453-19-event-upsert-32, resetting offset
> [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 
> ([prd453-19-event-upsert]-bo-pipeline-12)] 
> [o.a.k.c.consumer.internals.Fetcher] [Consumer 
> clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
>  groupId=bo-indexer-group-prd453-19] Resetting offset for partition 
> prd453-19-event-upsert-32 to offset 453223789.
>   {noformat}
> Broker logs for the partition :
> {noformat}
> [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable 
> segments with base offsets [452091893] due to retention time 8640ms breach
>  [2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log 
> segment [baseOffset 452091893, size 1073741693] for deletion.
>  [2020-07-17T07:40:12,083Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log 
> start offset to 453223789
>  [2020-07-17T07:41:12,083Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.Log]  
> [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 
> 452091893
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted log 
> /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted offset index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted.
>  [2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  
> [kafka.log.LogSegment]  Deleted time index 
> /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted.
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  
> [kafka.log.ProducerStateManager]  [ProducerStateManager 
> partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 
> 475609786
>  [2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  
> [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] 
> Roll

[GitHub] [kafka] ouyangnengda commented on pull request #9297: KAFKA-10495:Fix spelling mistake

2020-09-17 Thread GitBox


ouyangnengda commented on pull request #9297:
URL: https://github.com/apache/kafka/pull/9297#issuecomment-694282470


   ok



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edoardocomar commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-09-17 Thread GitBox


edoardocomar commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r490335096



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -283,49 +296,140 @@ public void testReplication() throws 
InterruptedException {
 
 waitForCondition(() -> {
 try {
-return primaryClient.remoteConsumerOffsets("consumer-group-1", 
"backup",
+return primaryClient.remoteConsumerOffsets(consumerGroupName, 
"backup",
 Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0));
 } catch (Throwable e) {
 return false;
 }
 }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary 
cluster.");
 
-Map primaryOffsets = 
primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+Map primaryOffsets = 
primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
 // Failback consumer group to primary cluster
-consumer2 = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
"consumer-group-1"));
-consumer2.assign(primaryOffsets.keySet());
-primaryOffsets.forEach(consumer2::seek);
-consumer2.poll(Duration.ofMillis(500));
-
-assertTrue("Consumer failedback to zero upstream offset.", 
consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-assertTrue("Consumer failedback to zero downstream offset.", 
consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-assertTrue("Consumer failedback beyond expected upstream offset.", 
consumer2.position(
-new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-assertTrue("Consumer failedback beyond expected downstream offset.", 
consumer2.position(
-new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED);
-
-consumer2.close();
-  
+primaryConsumer = primary.kafka().createConsumer(consumerProps);
+primaryConsumer.assign(allPartitions("test-topic-1", 
"backup.test-topic-1"));
+seek(primaryConsumer, primaryOffsets);
+consumeAllMessages(primaryConsumer, 0);
+
+assertTrue("Consumer failedback to zero upstream offset.", 
primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+assertTrue("Consumer failedback to zero downstream offset.", 
primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+assertTrue("Consumer failedback beyond expected upstream offset.", 
primaryConsumer.position(
+new TopicPartition("test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+assertTrue("Consumer failedback beyond expected downstream offset.", 
primaryConsumer.position(
+new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+
+Map>> messages2 = 
consumeAllMessages(primaryConsumer, 0);
+// If offset translation was successful we expect no messages to be 
consumed after failback
+assertEquals("Data was consumed from partitions: " + 
messages2.keySet() + ".", 0, messages2.size());
+primaryConsumer.close();
+
 // create more matching topics
 primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
 backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + 
i);
-backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+produceMessages(primary, "test-topic-2", "message-3-", 1);
+produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+assertEquals("Records were not produced to primary cluster.", 
NUM_RECORDS_PER_PARTITION,
+primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+assertEquals("Records were not produced to backup cluster.", 
NUM_RECORDS_PER_PARTITION,
+backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+assertEquals("New topic was not replicated to primary cluster.", 
NUM_RECORDS_PER_PARTITION,
+primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * 
RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+assertEquals("New topic was not replicated to backup cluster.", 
NUM_RECORDS_PER_PARTITION,
+backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+}
+
+@Test
+public void testReplicationWithEmptyPartition() throws 
InterruptedException {
+String consumerGroupName = 
"consumer-group-testRepli

[GitHub] [kafka] ijuma opened a new pull request #9299: MINOR: Use `Map.foreachKv` to avoid tuple allocation in Scala 2.13

2020-09-17 Thread GitBox


ijuma opened a new pull request #9299:
URL: https://github.com/apache/kafka/pull/9299


   `foreachKv` invokes `foreachEntry` in Scala 2.13 and falls back to
   `foreach` in Scala 2.12.
   
   This change requires a newer version of scala-collection-compat, so
   update it to the latest version (2.2.0).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9299: MINOR: Use `Map.foreachKv` to avoid tuple allocation in Scala 2.13

2020-09-17 Thread GitBox


ijuma commented on a change in pull request #9299:
URL: https://github.com/apache/kafka/pull/9299#discussion_r490344276



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -133,7 +133,7 @@ object GetOffsetShell {
 }
 }
 
-partitionOffsets.toSeq.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>
+partitionOffsets.toArray.sortBy { case (tp, _) => tp.partition }.foreach { 
case (tp, offset) =>

Review comment:
   Unrelated clean-up I noticed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-09-17 Thread GitBox


mimaison commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r490344407



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -201,26 +213,24 @@ public void close() {
 
 @Test
 public void testReplication() throws InterruptedException {
+String consumerGroupName = "consumer-group-testReplication";
+Map consumerProps  = new HashMap() {{
+put("group.id", consumerGroupName);
+put("auto.offset.reset", "latest");

Review comment:
   `latest` is the default, why are we setting it?

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -136,10 +141,19 @@ public void setup() throws InterruptedException {
 backup.kafka().createTopic("primary.test-topic-1", 1);
 backup.kafka().createTopic("heartbeats", 1);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-1-" + i);
-backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-2-" + i);
-}
+// produce to all partitions of test-topic-1
+produceMessages(primary, "test-topic-1", "message-1-");
+produceMessages(backup, "test-topic-1", "message-2-");
+
+// Generate some consumer activity on both clusters to ensure the 
checkpoint connector always starts promptly
+Map dummyProps = new HashMap();

Review comment:
   We can use `Collections.singletonMap()` here

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -283,49 +296,140 @@ public void testReplication() throws 
InterruptedException {
 
 waitForCondition(() -> {
 try {
-return primaryClient.remoteConsumerOffsets("consumer-group-1", 
"backup",
+return primaryClient.remoteConsumerOffsets(consumerGroupName, 
"backup",
 Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0));
 } catch (Throwable e) {
 return false;
 }
 }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary 
cluster.");
 
-Map primaryOffsets = 
primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+Map primaryOffsets = 
primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
 // Failback consumer group to primary cluster
-consumer2 = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
"consumer-group-1"));
-consumer2.assign(primaryOffsets.keySet());
-primaryOffsets.forEach(consumer2::seek);
-consumer2.poll(Duration.ofMillis(500));
-
-assertTrue("Consumer failedback to zero upstream offset.", 
consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-assertTrue("Consumer failedback to zero downstream offset.", 
consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-assertTrue("Consumer failedback beyond expected upstream offset.", 
consumer2.position(
-new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-assertTrue("Consumer failedback beyond expected downstream offset.", 
consumer2.position(
-new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED);
-
-consumer2.close();
-  
+primaryConsumer = primary.kafka().createConsumer(consumerProps);
+primaryConsumer.assign(allPartitions("test-topic-1", 
"backup.test-topic-1"));
+seek(primaryConsumer, primaryOffsets);
+consumeAllMessages(primaryConsumer, 0);
+
+assertTrue("Consumer failedback to zero upstream offset.", 
primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+assertTrue("Consumer failedback to zero downstream offset.", 
primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+assertTrue("Consumer failedback beyond expected upstream offset.", 
primaryConsumer.position(
+new TopicPartition("test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+assertTrue("Consumer failedback beyond expected downstream offset.", 
primaryConsumer.position(
+new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+
+Map>> messages2 = 
consumeAllMessages(primaryConsumer, 0);
+// If offset translation was successful we expect no messages to be 
consumed after failback
+assertEquals("Data was consumed from partitions: " + 
messages2.keySet() + ".", 0, messages2.size());
+primaryConsumer.close();
+
 // create more matching topics
 primar

[GitHub] [kafka] jolshan commented on pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


jolshan commented on pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#issuecomment-694319833


   I also like @dajac's suggestion. I've updated the PR so that the tests 
assert more than 1 address as @mumrah said. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


jolshan commented on pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#issuecomment-694325685


   Also created the issue https://issues.apache.org/jira/browse/KAFKA-10496



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10496) Create an in-memory DNS server for ClientUtilsTest and ClusterConnectionStatesTest

2020-09-17 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10496:
--

 Summary: Create an in-memory DNS server for ClientUtilsTest and 
ClusterConnectionStatesTest
 Key: KAFKA-10496
 URL: https://issues.apache.org/jira/browse/KAFKA-10496
 Project: Kafka
  Issue Type: Improvement
Reporter: Justine Olshan


ClientUtilsTest and ClusterConnectionStatesTest currently use an external DNS 
to resolve the IP addresses for kafka.apache.org. Until recently there were 
only two IP4 addresses, but now there is a third. 

These tests are pretty fragile when they rely on outside sources, so it would 
make sense to create an in-memory DNS.  This is what netty does for similar 
tests.  
[https://github.com/netty/netty/blob/master/resolver-dns/src/test/java/io/netty/resolver/dns/TestDnsServer.java].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10496) Create an in-memory DNS server for ClientUtilsTest and ClusterConnectionStatesTest

2020-09-17 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10496:
---
Description: 
ClientUtilsTest and ClusterConnectionStatesTest currently use an external DNS 
to resolve the IP addresses for kafka.apache.org. Until recently there were 
only two IP4 addresses, but now there is a third. 

At first the tests were changed to just check for multiple addresses. 
[https://github.com/apache/kafka/pull/9294]

However, these tests are pretty fragile when they rely on outside sources, so 
it would make sense to create an in-memory DNS.  This is what netty does for 
similar tests. 

 
[https://github.com/netty/netty/blob/master/resolver-dns/src/test/java/io/netty/resolver/dns/TestDnsServer.java].

  was:
ClientUtilsTest and ClusterConnectionStatesTest currently use an external DNS 
to resolve the IP addresses for kafka.apache.org. Until recently there were 
only two IP4 addresses, but now there is a third. 

At first the tests were changed to just check for multiple addresses. 
https://github.com/apache/kafka/pull/9294

However, these tests are pretty fragile when they rely on outside sources, so 
it would make sense to create an in-memory DNS.  This is what netty does for 
similar tests.  
[https://github.com/netty/netty/blob/master/resolver-dns/src/test/java/io/netty/resolver/dns/TestDnsServer.java].


> Create an in-memory DNS server for ClientUtilsTest and 
> ClusterConnectionStatesTest
> --
>
> Key: KAFKA-10496
> URL: https://issues.apache.org/jira/browse/KAFKA-10496
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Priority: Minor
>
> ClientUtilsTest and ClusterConnectionStatesTest currently use an external DNS 
> to resolve the IP addresses for kafka.apache.org. Until recently there were 
> only two IP4 addresses, but now there is a third. 
> At first the tests were changed to just check for multiple addresses. 
> [https://github.com/apache/kafka/pull/9294]
> However, these tests are pretty fragile when they rely on outside sources, so 
> it would make sense to create an in-memory DNS.  This is what netty does for 
> similar tests. 
>  
> [https://github.com/netty/netty/blob/master/resolver-dns/src/test/java/io/netty/resolver/dns/TestDnsServer.java].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10496) Create an in-memory DNS server for ClientUtilsTest and ClusterConnectionStatesTest

2020-09-17 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10496:
---
Description: 
ClientUtilsTest and ClusterConnectionStatesTest currently use an external DNS 
to resolve the IP addresses for kafka.apache.org. Until recently there were 
only two IP4 addresses, but now there is a third. 

At first the tests were changed to just check for multiple addresses. 
https://github.com/apache/kafka/pull/9294

However, these tests are pretty fragile when they rely on outside sources, so 
it would make sense to create an in-memory DNS.  This is what netty does for 
similar tests.  
[https://github.com/netty/netty/blob/master/resolver-dns/src/test/java/io/netty/resolver/dns/TestDnsServer.java].

  was:
ClientUtilsTest and ClusterConnectionStatesTest currently use an external DNS 
to resolve the IP addresses for kafka.apache.org. Until recently there were 
only two IP4 addresses, but now there is a third. 

These tests are pretty fragile when they rely on outside sources, so it would 
make sense to create an in-memory DNS.  This is what netty does for similar 
tests.  
[https://github.com/netty/netty/blob/master/resolver-dns/src/test/java/io/netty/resolver/dns/TestDnsServer.java].


> Create an in-memory DNS server for ClientUtilsTest and 
> ClusterConnectionStatesTest
> --
>
> Key: KAFKA-10496
> URL: https://issues.apache.org/jira/browse/KAFKA-10496
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Priority: Minor
>
> ClientUtilsTest and ClusterConnectionStatesTest currently use an external DNS 
> to resolve the IP addresses for kafka.apache.org. Until recently there were 
> only two IP4 addresses, but now there is a third. 
> At first the tests were changed to just check for multiple addresses. 
> https://github.com/apache/kafka/pull/9294
> However, these tests are pretty fragile when they rely on outside sources, so 
> it would make sense to create an in-memory DNS.  This is what netty does for 
> similar tests.  
> [https://github.com/netty/netty/blob/master/resolver-dns/src/test/java/io/netty/resolver/dns/TestDnsServer.java].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps

2020-09-17 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-10372:
---
Fix Version/s: (was: 2.6.1)

> [JAVA 11] Unrecognized VM option PrintGCDateStamps
> --
>
> Key: KAFKA-10372
> URL: https://issues.apache.org/jira/browse/KAFKA-10372
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Kevin Tibi
>Priority: Blocker
>  Labels: bug
>
> Hello,
> I can't start kafka with JAVA 11. 
>  
> {code:java}
> kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps'
> kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine.
> kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program 
> will exit.{code}
> This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11.
> Solution :
> {code:java}
> -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and 
> PrintGCDateStamps{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10491) Check authorizations before other criteria in KafkaApis

2020-09-17 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino reassigned KAFKA-10491:
-

Assignee: Ron Dagostino

> Check authorizations before other criteria in KafkaApis
> ---
>
> Key: KAFKA-10491
> URL: https://issues.apache.org/jira/browse/KAFKA-10491
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Arthur
>Assignee: Ron Dagostino
>Priority: Minor
>
> In KafkaApis#handleAlterUserScramCredentialsRequest we check if the current 
> broker is the controller before checking if the request is authorized. This 
> is a potential information leak about details of the system (i.e., who is the 
> controller). We should fix this to check the authz first.
> [~hachikuji] pointed this out during the review for AlterIsr since I had 
> followed the pattern in handleAlterUserScramCredentialsRequest. 
> We should fix handleAlterUserScramCredentialsRequest and audit the rest of 
> KafkaApis for similar patterns.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr commented on a change in pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-17 Thread GitBox


dongjinleekr commented on a change in pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#discussion_r490383726



##
File path: core/src/test/scala/kafka/utils/LoggingTest.scala
##
@@ -58,4 +60,20 @@ class LoggingTest extends Logging {
 
 assertEquals(logging.getClass.getName, logging.log.underlying.getName)
   }
+
+  @Test
+  def testLoggerLevelIsResolved(): Unit = {
+val controller = new Log4jController()
+val previousLevel = controller.getLogLevel("kafka")
+try {
+  controller.setLogLevel("kafka", "TRACE")
+  
Logger(LoggerFactory.getLogger("kafka.utils.Log4jControllerTest")).trace("test")

Review comment:
   I verified that it works without 
`com.typesafe.scalalogging.Logger#apply` here, i.e.,  
`LoggerFactory.getLogger("kafka.utils.Log4jControllerTest").trace("test")`. 
Other test cases like `RestServerTest` don't invoke it. It would be better to 
remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9296: MINOR: remove unused scala files from core module

2020-09-17 Thread GitBox


chia7712 commented on pull request #9296:
URL: https://github.com/apache/kafka/pull/9296#issuecomment-694355722


   @dongjinleekr thanks for review!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10497) Convert group/transaction coordinator metadata schemas to use generated protocol

2020-09-17 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10497:
---

 Summary: Convert group/transaction coordinator metadata schemas to 
use generated protocol
 Key: KAFKA-10497
 URL: https://issues.apache.org/jira/browse/KAFKA-10497
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Chia-Ping Tsai


We need to convert the internal schemas used for representing transaction/group 
metadata to the generated protocol. This opens the door for flexible version 
support on the next bump. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

2020-09-17 Thread GitBox


chia7712 commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r490405191



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -280,7 +280,7 @@ synchronized void unlock(final TaskId taskId) throws 
IOException {
 public synchronized void clean() {
 // remove task dirs
 try {
-cleanRemovedTasks(0, true);
+cleanRemovedTasksCalledByUser();
 } catch (final Exception e) {
 // this is already logged within cleanRemovedTasks

Review comment:
   The previous comment is invalid as the method "cleanRemovedTasks" is 
removed. The replacement is "cleanRemovedTasksCalledByUser"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState

2020-09-17 Thread highluck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

highluck reassigned KAFKA-10283:


Assignee: highluck

> Consolidate client-level and consumer-level assignment within ClientState
> -
>
> Key: KAFKA-10283
> URL: https://issues.apache.org/jira/browse/KAFKA-10283
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>  Labels: newbie++
>
> In StreamsPartitionAssignor, we do a two-level assignment, one on the 
> client-level, and then after the assignment is done we further decide within 
> the client how to distributed among consumers if there are more.
> The {{ClientState}} class is used for book-keeping the assigned tasks, 
> however it is only used for the first level, while for the second level it is 
> done outside of the class and we only keep track of the results in a few maps 
> for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. 
> some on the client level and some on the consumer level.
> We would like to consolidate some of these maps into a single data structure 
> for better keeping track of the assignment information, and also for less bug 
> vulnerability causing the assignment information to be inconsistent. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10498) Consumer should do offset/epoch validation through `Fetch` when possible

2020-09-17 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10498:
---

 Summary: Consumer should do offset/epoch validation through 
`Fetch` when possible
 Key: KAFKA-10498
 URL: https://issues.apache.org/jira/browse/KAFKA-10498
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Chia-Ping Tsai


Currently the consumer has logic to detect truncations (as a result of unclean 
leader election for example) using the OffsetsForLeaderEpoch API. It is a 
rather cumbersome and expensive process since we have to check for the need to 
send this request on every poll(). We should be able to do better now that 
KIP-595 has built support for truncation detection directly into the `Fetch` 
protocol. This should allow us to skip validation when we know that the `Fetch` 
version is high enough. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


mumrah commented on a change in pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#discussion_r490417685



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -273,22 +273,21 @@ public void testMultipleIPsWithDefault() throws 
UnknownHostException {
 
 @Test
 public void testMultipleIPsWithUseAll() throws UnknownHostException {
-assertEquals(2, ClientUtils.resolve(hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS).size());
+assertTrue(ClientUtils.resolve(hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
 
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr1 = connectionStates.currentAddress(nodeId1);
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr2 = connectionStates.currentAddress(nodeId1);
 assertNotSame(addr1, addr2);
-
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr3 = connectionStates.currentAddress(nodeId1);
-assertSame(addr1, addr3);

Review comment:
   Hmm, so previously (and also with this change) we are assuming that we 
will get different resolved IPs for each connection to apache.kafka.org? This 
seems to rely on round-robbin DNS resolution that we can't really control. I 
think these assertions will always be prone to failure unless we can control 
the DNS server like @dajac suggested.
   
   I guess we can commit this to unblock the tests, but we should definitely 
prioritize a better fix.

##
File path: clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
##
@@ -102,20 +102,20 @@ public void testResolveUnknownHostException() throws 
UnknownHostException {
 
 @Test
 public void testResolveDnsLookup() throws UnknownHostException {
-// Note that kafka.apache.org resolves to 2 IP addresses
+// Note that kafka.apache.org resolves to at least 2 IP addresses

Review comment:
   Comment here and below refer to 2 IP addresses still.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


jolshan commented on a change in pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#discussion_r490423860



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -273,22 +273,21 @@ public void testMultipleIPsWithDefault() throws 
UnknownHostException {
 
 @Test
 public void testMultipleIPsWithUseAll() throws UnknownHostException {
-assertEquals(2, ClientUtils.resolve(hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS).size());
+assertTrue(ClientUtils.resolve(hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
 
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr1 = connectionStates.currentAddress(nodeId1);
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr2 = connectionStates.currentAddress(nodeId1);
 assertNotSame(addr1, addr2);
-
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr3 = connectionStates.currentAddress(nodeId1);
-assertSame(addr1, addr3);

Review comment:
   Yeah. I was wondering if we should even keep this test in the current 
form, and I'm hoping that a DNS we can control would resolve this issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-17 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-694377628


   @dongjinleekr done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10493) Ktable out-of-order updates are not being ignored

2020-09-17 Thread Pedro Gontijo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pedro Gontijo updated KAFKA-10493:
--
Affects Version/s: (was: 2.3.0)

> Ktable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Priority: Major
> Attachments: KTableOutOfOrderBug.java
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino opened a new pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis

2020-09-17 Thread GitBox


rondagostino opened a new pull request #9300:
URL: https://github.com/apache/kafka/pull/9300


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-17 Thread GitBox


dongjinleekr commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r490449947



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -910,6 +903,11 @@ class DynamicListenerConfig(server: KafkaServer) extends 
BrokerReconfigurable wi
 if 
(!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
   throw new ConfigException(s"Listeners '$newListeners' must be subset of 
listener map '${newConfig.listenerSecurityProtocolMap}'")
 newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName 
=>
+  def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): 
Map[String, AnyRef] = {
+kafkaConfig.originals.asScala.filter { case (key, _) =>
+  key.startsWith(prefix) && !DynamicSecurityConfigs.contains(key)
+}
+  }
   val prefix = listenerName.configPrefix
   val newListenerProps = immutableListenerConfigs(newConfig, prefix)

Review comment:
   Right, this line does not work correctly as of present.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10497) Convert group/transaction coordinator metadata schemas to use generated protocol

2020-09-17 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197881#comment-17197881
 ] 

David Jacot commented on KAFKA-10497:
-

I already have a PR for the consumer protocol part: 
https://github.com/apache/kafka/pull/8897

> Convert group/transaction coordinator metadata schemas to use generated 
> protocol
> 
>
> Key: KAFKA-10497
> URL: https://issues.apache.org/jira/browse/KAFKA-10497
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We need to convert the internal schemas used for representing 
> transaction/group metadata to the generated protocol. This opens the door for 
> flexible version support on the next bump. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10497) Convert group/transaction coordinator metadata schemas to use generated protocol

2020-09-17 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17197885#comment-17197885
 ] 

David Jacot commented on KAFKA-10497:
-

[~hachikuji]  [~chia7712] FYI

> Convert group/transaction coordinator metadata schemas to use generated 
> protocol
> 
>
> Key: KAFKA-10497
> URL: https://issues.apache.org/jira/browse/KAFKA-10497
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We need to convert the internal schemas used for representing 
> transaction/group metadata to the generated protocol. This opens the door for 
> flexible version support on the next bump. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments

2020-09-17 Thread GitBox


dajac commented on pull request #8897:
URL: https://github.com/apache/kafka/pull/8897#issuecomment-694410433


   @hachikuji @chia7712 FYI



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


jolshan commented on a change in pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#discussion_r490461933



##
File path: clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
##
@@ -102,20 +102,20 @@ public void testResolveUnknownHostException() throws 
UnknownHostException {
 
 @Test
 public void testResolveDnsLookup() throws UnknownHostException {
-// Note that kafka.apache.org resolves to 2 IP addresses
+// Note that kafka.apache.org resolves to at least 2 IP addresses

Review comment:
   Are you saying to change it back? I thought it was fair to say "at least 
2" to reflect we are asserting greater than 1.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-09-17 Thread GitBox


tombentley commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-694425788


   @omkreddy I fixed a failing test. The rest look unrelated. Could you trigger 
the build again please?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-09-17 Thread GitBox


abbccdda commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-694430943


   @mimaison Sorry was on the vacation, will take another look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490464756



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -157,6 +158,44 @@ case class OngoingReassignmentState(addingReplicas: 
Seq[Int],
 
 case class SimpleAssignmentState(replicas: Seq[Int]) extends AssignmentState
 
+
+
+sealed trait IsrState {
+  /**
+   * Includes only the in-sync replicas which have been committed to ZK.
+   */
+  def isr: Set[Int]
+
+  /**
+   * This set may include un-committed ISR members following an expansion. 
This "effective" ISR is used for advancing
+   * the high watermark as well as determining which replicas are required for 
acks=all produce requests.
+   *
+   * Only applicable as of IBP 2.7-IV2, for older versions this will return 
the committed ISR
+   *
+   */
+  def maximalIsr: Set[Int]
+
+  /**
+   * Indicates if we have an AlterIsr request inflight.
+   */
+  def inflight: Boolean

Review comment:
   nit: `hasInflight`?

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -700,19 +764,16 @@ class Partition(val topicPartition: TopicPartition,
   inWriteLock(leaderIsrUpdateLock) {
 // check if this replica needs to be added to the ISR
 if (needsExpandIsr(followerReplica)) {
-  val newInSyncReplicaIds = inSyncReplicaIds + followerReplica.brokerId
-  info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} to 
${newInSyncReplicaIds.mkString(",")}")
-  // update ISR in ZK and cache
-  expandIsr(newInSyncReplicaIds)
+  expandIsr(followerReplica.brokerId)
 }
   }
 }
   }
 
   private def needsExpandIsr(followerReplica: Replica): Boolean = {
-leaderLogIfLocal.exists { leaderLog =>
+!hasInFlightAlterIsr && leaderLogIfLocal.exists { leaderLog =>

Review comment:
   I think we can refactor this a little bit to avoid some duplication and 
inconsistency. We have the following logic above when updating follower state:
   ```scala
   if (!isrState.maximalIsr.contains(followerId))
 maybeExpandIsr(followerReplica, followerFetchTimeMs)
   ```
   This is a little inconsistent because here we are checking `isrState.isr`. 
I'd suggest splitting this method into something like the following:
   
   ```scala
   def hasReachedHighWatermark(follower: Replica): Boolean = {
 leaderLogIfLocal.exists { leaderLog =>
   val leaderHighwatermark = leaderLog.highWatermark
   isFollowerInSync(follower, leaderHighwatermark)
 }
   }
   
   def canAddToIsr(followerId: Int): Boolean = {
 val current = isrState
 !current.inflight && !current.isr.contains(followerId)
   }
   
   def needsExpandIsr(follower: Replica): Boolean = {
 canAddToIsr(follower.brokerId) && hasReachedHighWatermark(follower)
   }
   ```
   
   Then we can change the logic in `maybeExpandIsr` to the following:
   ```scala
   val needsIsrUpdate = canAddToIsr(followerReplica) && 
inReadLock(leaderIsrUpdateLock) {
   ...
   ```
   
   

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1364,50 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(): Boolean = {
+val isrToSend: Option[Set[Int]] = isrState match {
+  case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+  case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+  case CommittedIsr(_) =>
+error(s"Asked to send AlterIsr but there are no pending updates")
+None
+}
+if (isrToSend.isDefined) {

Review comment:
   nit: can probably rework this as `exists`
   ```scala
   isrToSendOpt.exists { isrToSend =>
   ...
   }
   ```

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -618,9 +682,9 @@ class Partition(val topicPartition: TopicPartition,
 // since the replica's logStartOffset may have incremented
 val leaderLWIncremented = newLeaderLW > oldLeaderLW
 
-// check if we need to expand ISR to include this replica
-// if it is not in the ISR yet
-if (!inSyncReplicaIds.contains(followerId))
+// Check if this in-sync replica needs to be added to the ISR. We look 
at the "maximal" ISR here so we don't
+// send an additional Alter ISR request for the same replica

Review comment:
   Another possibility is that the replica is pending removal in which case 
another `AlterIsr` will be needed. I think it might be more intuitive to make 
this check:
   
   ```scala
   if (!isrState.inflight && !isrState.isr.contains(followerId))
   ```

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -858,10 +925,10 @@ class Partition(val topicPartition: TopicPartition,
 case Some(leaderLog) =>
   val outOfSyncReplicaIds = getOutOfS

[GitHub] [kafka] abbccdda commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-09-17 Thread GitBox


abbccdda commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r490504884



##
File path: clients/src/main/resources/common/message/ListOffsetResponse.json
##
@@ -48,7 +48,7 @@
   "about": "The timestamp associated with the returned offset." },
 { "name": "Offset", "type": "int64", "versions": "1+", "default": 
"-1", "ignorable": false,
   "about": "The returned offset." },
-{ "name": "LeaderEpoch", "type": "int32", "versions": "4+" }
+{ "name": "LeaderEpoch", "type": "int32", "versions": "4+", "default": 
"-1" }

Review comment:
   Why do we need this default?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-09-17 Thread GitBox


abbccdda commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r490506434



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3732,41 +3734,43 @@ public void testListOffsets() throws Exception {
 Collections.emptySet(),
 node0);
 
-final TopicPartition tp1 = new TopicPartition("foo", 0);
-final TopicPartition tp2 = new TopicPartition("bar", 0);
-final TopicPartition tp3 = new TopicPartition("baz", 0);
+final TopicPartition tp0 = new TopicPartition("foo", 0);

Review comment:
   I will leave it up to you, as long as you ensure the tests itself are 
mutated correctly, it's not easy to eyeball such a change for no-op.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-09-17 Thread GitBox


abbccdda commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r490507674



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##
@@ -2496,46 +2501,81 @@ public void 
testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
 client.updateMetadata(initialUpdateResponse);
 
 final long fetchTimestamp = 10L;
-Map 
allPartitionData = new HashMap<>();
-allPartitionData.put(tp0, new ListOffsetResponse.PartitionData(
-Errors.NONE, fetchTimestamp, 4L, Optional.empty()));
-allPartitionData.put(tp1, new ListOffsetResponse.PartitionData(
-retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, 
Optional.empty()));
+List topics = Collections.singletonList(
+new ListOffsetTopicResponse()
+.setName(tp0.topic())
+.setPartitions(Arrays.asList(
+new ListOffsetPartitionResponse()

Review comment:
   Not sure why we need 8 indent here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-09-17 Thread GitBox


abbccdda commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r490509357



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##
@@ -2496,46 +2501,81 @@ public void 
testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
 client.updateMetadata(initialUpdateResponse);
 
 final long fetchTimestamp = 10L;
-Map 
allPartitionData = new HashMap<>();
-allPartitionData.put(tp0, new ListOffsetResponse.PartitionData(
-Errors.NONE, fetchTimestamp, 4L, Optional.empty()));
-allPartitionData.put(tp1, new ListOffsetResponse.PartitionData(
-retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, 
Optional.empty()));
+List topics = Collections.singletonList(
+new ListOffsetTopicResponse()
+.setName(tp0.topic())
+.setPartitions(Arrays.asList(
+new ListOffsetPartitionResponse()
+.setPartitionIndex(tp0.partition())
+.setErrorCode(Errors.NONE.code())
+.setTimestamp(fetchTimestamp)
+.setOffset(4L),
+new ListOffsetPartitionResponse()
+.setPartitionIndex(tp1.partition())
+.setErrorCode(retriableError.code())
+
.setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP)
+.setOffset(-1L;
+ListOffsetResponseData data = new ListOffsetResponseData()
+.setThrottleTimeMs(0)
+.setTopics(topics);
 
 client.prepareResponseFrom(body -> {
 boolean isListOffsetRequest = body instanceof 
ListOffsetRequest;
 if (isListOffsetRequest) {
 ListOffsetRequest request = (ListOffsetRequest) body;
-Map 
expectedTopicPartitions = new HashMap<>();
-expectedTopicPartitions.put(tp0, new 
ListOffsetRequest.PartitionData(
-fetchTimestamp, Optional.empty()));
-expectedTopicPartitions.put(tp1, new 
ListOffsetRequest.PartitionData(
-fetchTimestamp, Optional.empty()));
-
-return 
request.partitionTimestamps().equals(expectedTopicPartitions);
+List expectedTopics = 
Collections.singletonList(
+new ListOffsetTopic()
+.setName(tp0.topic())
+.setPartitions(Arrays.asList(
+new ListOffsetPartition()
+.setPartitionIndex(tp1.partition())
+.setTimestamp(fetchTimestamp)
+
.setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH),
+new ListOffsetPartition()
+.setPartitionIndex(tp0.partition())
+.setTimestamp(fetchTimestamp)
+
.setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH;
+return request.topics().equals(expectedTopics);
 } else {
 return false;
 }
-}, new ListOffsetResponse(allPartitionData), originalLeader);
+}, new ListOffsetResponse(data), originalLeader);
 
 client.prepareMetadataUpdate(updatedMetadata);
 
 // If the metadata wasn't updated before retrying, the fetcher 
would consult the original leader and hit a NOT_LEADER exception.
 // We will count the answered future response in the end to verify 
if this is the case.
-Map 
paritionDataWithFatalError = new HashMap<>(allPartitionData);
-paritionDataWithFatalError.put(tp1, new 
ListOffsetResponse.PartitionData(
-Errors.NOT_LEADER_OR_FOLLOWER, 
ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
-client.prepareResponseFrom(new 
ListOffsetResponse(paritionDataWithFatalError), originalLeader);
+List topicsWithFatalError = 
Collections.singletonList(
+new ListOffsetTopicResponse()
+.setName(tp0.topic())
+.setPartitions(Arrays.asList(
+new ListOffsetPartitionResponse()

Review comment:
   Could we reuse the struct in L2508?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -910,136 +913,161 @@ class KafkaApis(val requestChannel: RequestChannel,

[jira] [Created] (KAFKA-10499) 4 Unit Tests are breaking after addition of a new A record to "apache.org"

2020-09-17 Thread Prateek Agarwal (Jira)
Prateek Agarwal created KAFKA-10499:
---

 Summary: 4 Unit Tests are breaking after addition of a new A 
record to "apache.org"
 Key: KAFKA-10499
 URL: https://issues.apache.org/jira/browse/KAFKA-10499
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Affects Versions: 2.5.1
Reporter: Prateek Agarwal


{{apache.org}} earlier used to resolve only to 2 A records: 95.216.24.32 and 
40.79.78.1

 

With addition of a new A record 95.216.26.30, 4 unit tests have started 
failing, which expect the count of DNS resolution to be 2, but instead it is 
now 3.

 
{code:java}
org.apache.kafka.clients.ClusterConnectionStatesTest > 
testMultipleIPsWithUseAll FAILED
java.lang.AssertionError: expected:<2> but was:<3>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.clients.ClusterConnectionStatesTest.testMultipleIPsWithUseAll(ClusterConnectionStatesTest.java:241)


org.apache.kafka.clients.ClusterConnectionStatesTest > testHostResolveChange 
FAILED
java.lang.AssertionError: expected:<2> but was:<3>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.clients.ClusterConnectionStatesTest.testHostResolveChange(ClusterConnectionStatesTest.java:256)

org.apache.kafka.clients.ClusterConnectionStatesTest > 
testMultipleIPsWithDefault FAILED
java.lang.AssertionError: expected:<2> but was:<3>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.clients.ClusterConnectionStatesTest.testMultipleIPsWithDefault(ClusterConnectionStatesTest.java:231)

org.apache.kafka.clients.ClientUtilsTest > testResolveDnsLookupAllIps FAILED
java.lang.AssertionError: expected:<2> but was:<3>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.clients.ClientUtilsTest.testResolveDnsLookupAllIps(ClientUtilsTest.java:87)
 {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490527627



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1246,6 +1364,50 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
+  private def sendAlterIsrRequest(): Boolean = {
+val isrToSend: Option[Set[Int]] = isrState match {
+  case PendingExpandIsr(isr, newInSyncReplicaId) => Some(isr + 
newInSyncReplicaId)
+  case PendingShrinkIsr(isr, outOfSyncReplicaIds) => Some(isr -- 
outOfSyncReplicaIds)
+  case CommittedIsr(_) =>
+error(s"Asked to send AlterIsr but there are no pending updates")
+None
+}
+if (isrToSend.isDefined) {
+  val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.get.toList, zkVersion)
+  val callbackPartial = handleAlterIsrResponse(isrToSend.get, _ : 
Either[Errors, LeaderAndIsr])
+  alterIsrManager.enqueue(AlterIsrItem(topicPartition, newLeaderAndIsr, 
callbackPartial))
+} else {
+  false
+}
+  }
+
+  private def handleAlterIsrResponse(proposedIsr: Set[Int], result: 
Either[Errors, LeaderAndIsr]): Unit = {
+inWriteLock(leaderIsrUpdateLock) {
+  result match {
+case Left(error: Errors) => error match {
+  case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
+debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since it doesn't know about this topic or 
partition. Giving up.")
+  case Errors.FENCED_LEADER_EPOCH =>
+debug(s"Controller failed to update ISR to 
${proposedIsr.mkString(",")} since we sent an old leader epoch. Giving up.")
+  case _ =>

Review comment:
   Since `INVALID_UPDATE_VERSION` is one of the expected errors at this 
level, can we add a separate case for it? For unexpected errors, we might want 
to log at warn level.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics.forEach { topicReq =>
+  topicReq.partitions.forEach { partitionReq =>
+val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+  }
+}
+
+def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code)
+case Left(partitionResults) =>
+  resp.setTopics(new util.ArrayList())
+  partitionResults.groupBy(_._1.topic).foreach { entry =>
+val topicResp = new AlterIsrResponseData.TopicData()
+  .setName(entry._1)
+  .setPartitions(new util.ArrayList())
+resp.topics.add(topicResp)
+entry._2.foreach { partitionEntry =>
+  partitionEntry._2 match {
+case Left(error) => topicResp.partitions.add(
+  new AlterIsrResponseData.PartitionData()
+.setPartitionIndex(partitionEntry._1.partition)
+.setErrorCode(error.code))
+case Right(leaderAndIsr) => topicResp.partitions.add(
+  new AlterIsrResponseData.PartitionData()
+.setPartitionIndex(partitionEntry._1.partition)
+.setLeaderId(leaderAndIsr.leader)
+.setLeaderEpoch(leaderAndIsr.leaderEpoch)
+.setIsr(leaderAndIsr.isr.map(Integer.valueOf).asJava)
+.setCurrentIsrVersion(leaderAndIsr.zkVersion))
+  }
+}
+  }
+  }
+  callback.apply(resp)
+}
+
+eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, 
alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
+  }
+
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: 
Map[TopicPartition, LeaderAndIsr],
+  callback: AlterIsrCallback): Unit = {
+
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Right(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  info(s"Ignoring AlterIsr due to unknown broker $brokerId")
+  callback.apply(Right(Errors.STALE_BROKER_EPOCH))
+  return
+}
+
+if (!brokerEpochOpt.contains(brokerEpoch)) {
+  info(s"Ignoring AlterIsr due to stale broker epoch $brokerEpo

[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490542749



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -200,9 +241,11 @@ class Partition(val topicPartition: TopicPartition,
   // defined when this broker is leader for partition
   @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
   @volatile var leaderReplicaIdOpt: Option[Int] = None
-  @volatile var inSyncReplicaIds = Set.empty[Int]
+  @volatile var isrState: IsrState = CommittedIsr(Set.empty)

Review comment:
   Yea i was thinking we should move the ISR to a separate public accessor. 
I'll change this





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490530496



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1763,6 +1768,143 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => Unit): Unit = {
+val isrsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]()
+
+alterIsrRequest.topics.forEach { topicReq =>
+  topicReq.partitions.forEach { partitionReq =>
+val tp = new TopicPartition(topicReq.name, partitionReq.partitionIndex)
+val newIsr = partitionReq.newIsr().asScala.toList.map(_.toInt)
+isrsToAlter.put(tp, new LeaderAndIsr(alterIsrRequest.brokerId, 
partitionReq.leaderEpoch, newIsr, partitionReq.currentIsrVersion))
+  }
+}
+
+def responseCallback(results: Either[Map[TopicPartition, Either[Errors, 
LeaderAndIsr]], Errors]): Unit = {
+  val resp = new AlterIsrResponseData()
+  results match {
+case Right(error) =>
+  resp.setErrorCode(error.code)
+case Left(partitionResults) =>
+  resp.setTopics(new util.ArrayList())
+  partitionResults.groupBy(_._1.topic).foreach { entry =>

Review comment:
   nit: can we avoid using `_1` and `_2`? It's a lot easier to follow if 
they are named.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah merged pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


mumrah merged pull request #9294:
URL: https://github.com/apache/kafka/pull/9294


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


mumrah commented on pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#issuecomment-694490391


   Thanks for the patch @jolshan!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490561028



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -920,7 +986,7 @@ class Partition(val topicPartition: TopicPartition,
  * is violated, that replica is considered to be out of sync
  *
  **/
-val candidateReplicaIds = inSyncReplicaIds - localBrokerId
+val candidateReplicaIds = isrState.maximalIsr - localBrokerId

Review comment:
   Makes sense, that will also satisfy your other comment about not 
checking for inflight requests within the write lock





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490562630



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -920,7 +986,7 @@ class Partition(val topicPartition: TopicPartition,
  * is violated, that replica is considered to be out of sync
  *
  **/
-val candidateReplicaIds = inSyncReplicaIds - localBrokerId
+val candidateReplicaIds = isrState.maximalIsr - localBrokerId

Review comment:
   Also, yes it's confusing to refer to `maximalIsr` here even though it 
should always equal the committed ISR at this point (assuming we check for 
inflight first). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-17 Thread GitBox


hachikuji commented on pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#issuecomment-694503921


   @chia7712 Can you take a look at DynamicConnectionsQuotaTest? Looks like 
this patch is causing it to fail.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9279: MINOR: Fix common struct `JsonConverter` and `Schema` generation

2020-09-17 Thread GitBox


cmccabe commented on pull request #9279:
URL: https://github.com/apache/kafka/pull/9279#issuecomment-694510055


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9279: MINOR: Fix common struct `JsonConverter` and `Schema` generation

2020-09-17 Thread GitBox


hachikuji merged pull request #9279:
URL: https://github.com/apache/kafka/pull/9279


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9299: MINOR: Use `Map.foreachKv` to avoid tuple allocation in Scala 2.13

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #9299:
URL: https://github.com/apache/kafka/pull/9299#discussion_r490586608



##
File path: core/src/main/scala/kafka/utils/Implicits.scala
##
@@ -46,4 +47,21 @@ object Implicits {
 
   }
 
+  /**
+   * Exposes `foreachKv` which maps to `foreachEntry` in Scala 2.13 and 
`foreach` in Scala 2.12
+   * (with the help of scala.collection.compat). `foreachEntry` avoids the 
tuple allocation and
+   * is more efficient.
+   *
+   * This was not named `foreachEntry` to avoid `unused import` warnings in 
Scala 2.13 (the implicit
+   * would not be triggered in Scala 2.13 since `Map.foreachEntry` would have 
precedence).
+   */
+  @nowarn("cat=unused-imports")
+  implicit class MapExtensionMethods[K, V](private val self: 
scala.collection.Map[K, V]) extends AnyVal {
+import scala.collection.compat._
+def foreachKv[U](f: (K, V) => U): Unit = {

Review comment:
   The name reads a tad awkwardly. I wonder if `foreachKeyValue` would be 
too verbose. Or maybe `foreachMapEntry`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-09-17 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r490594335



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -618,9 +682,9 @@ class Partition(val topicPartition: TopicPartition,
 // since the replica's logStartOffset may have incremented
 val leaderLWIncremented = newLeaderLW > oldLeaderLW
 
-// check if we need to expand ISR to include this replica
-// if it is not in the ISR yet
-if (!inSyncReplicaIds.contains(followerId))
+// Check if this in-sync replica needs to be added to the ISR. We look 
at the "maximal" ISR here so we don't
+// send an additional Alter ISR request for the same replica

Review comment:
   Yea checking the maximal set isn't needed anymore since adding the 
sealed trait. I'll just update this to simply call `maybeExpandIsr` which will 
do the check you propose here





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #9300:
URL: https://github.com/apache/kafka/pull/9300#discussion_r490604721



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel,
   sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
 }
 
+// be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+// information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
 val createTopicsRequest = request.body[CreateTopicsRequest]
 val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-if (!controller.isActive) {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name)
-  .setErrorCode(Errors.NOT_CONTROLLER.code))
-  }
-  sendResponseCallback(results)
-} else {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name))
-  }
-  val hasClusterAuthorization = authorize(request.context, CREATE, 
CLUSTER, CLUSTER_NAME,
-logIfDenied = false)
-  val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-  val authorizedTopics =
-if (hasClusterAuthorization) topics.toSet
-else filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-  val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+createTopicsRequest.data.topics.forEach { topic =>
+  results.add(new CreatableTopicResult().setName(topic.name))
+}
+val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, 
CLUSTER_NAME,
+  logIfDenied = false)
+val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+val authorizedTopics =
+  if (hasClusterAuthorization) topics.toSet
+  else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,

Review comment:
   Might not matter since `CreateTopics` requests are infrequent, but the 
two passes for authorization are a bit vexing. Feels like we are missing a good 
intermediate type between this handler and `AdminManager`. Maybe we can replace 
the 3 maps that we pass to `AdminManager.createTopic` with a single map which 
contains all the state we need for each topic.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1732,45 +1732,57 @@ class KafkaApis(val requestChannel: RequestChannel,
   sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
 }
 
+// be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+// information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
 val createTopicsRequest = request.body[CreateTopicsRequest]
 val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-if (!controller.isActive) {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name)
-  .setErrorCode(Errors.NOT_CONTROLLER.code))
-  }
-  sendResponseCallback(results)
-} else {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name))
-  }
-  val hasClusterAuthorization = authorize(request.context, CREATE, 
CLUSTER, CLUSTER_NAME,
-logIfDenied = false)
-  val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-  val authorizedTopics =
-if (hasClusterAuthorization) topics.toSet
-else filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-  val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+createTopicsRequest.data.topics.forEach { topic =>
+  results.add(new CreatableTopicResult().setName(topic.name))
+}
+val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, 
CLUSTER_NAME,
+  logIfDenied = false)
+val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+val authorizedTopics =
+  if (hasClusterAuthorization) topics.toSet
+  else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
+  topics, logIfDenied = false)(identity).map(name => name -> 
results.

[GitHub] [kafka] hachikuji commented on a change in pull request #8725: KAFKA-9608: Transaction Event Simulation Test

2020-09-17 Thread GitBox


hachikuji commented on a change in pull request #8725:
URL: https://github.com/apache/kafka/pull/8725#discussion_r490627471



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionEventSimulationTest.java
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test tries to test out the EOS robustness on the client side. It 
features a {@link TransactionSimulationCoordinator}
+ * which handles the incoming transactional produce/metadata requests and 
gives basic feedback through {@link MockClient}.
+ *
+ * Each iteration the transaction manager will append one record through 
accumulator and commit offset at the same time. The request
+ * being transmitted is not guaranteed to be processed or processed correctly, 
so a state checking loop is enforced to make the client
+ * and the coordinator interact with each other and ensure the state could be 
eventually clean using {@link TransactionManager#isReady}.
+ * By the end of the test we will check whether all the committed transactions 
are successfully materialized on the coordinator side.
+ *
+ * Features supported:
+ * 
+ * 1. Randomly abort transaction
+ * 2. Fault injection on response
+ * 3. Random message drop
+ */
+public class TransactionEventSimulationTest {
+
+private TransactionManager transactionManager;
+private TransactionSimulationCoordinator transactionCoordinator;
+private Sender sender;
+private final LogContext logContext = new LogContext();
+
+private final MockTime time = new MockTime();
+private final int requestTimeoutMs = 100;
+private final int retryBackOffMs = 0;
+private final long apiVersion = 0L;
+
+private ProducerMetadata metadata = new ProducerMetadata(0, 
Long.MAX_VALUE, 10,
+new LogContext(), new ClusterResourceListeners(), time);
+private MockClient client = new MockClient(time, metadata);
+
+@Before
+public void setup() {
+transactionManager = new TransactionManager(logContext, "txn-id",
+requestTimeoutMs, apiVersion, new ApiVersions(), false);
+transactionCoordinator = new TransactionSimulationCoordinator(client);
+}
+
+@Test
+public void simulateTxnEvents() throws InterruptedException {
+final int batchSize = 100;
+final int lingerMs = 0;
+final int deliveryTimeoutMs = 10;
+
+RecordAccumulator accumulator = new RecordAccumulator(logContext, 
batchSize, CompressionType.GZIP,
+lingerMs, retryBackOffMs, deliveryTimeoutMs, new Metrics(), 
"accumulator", time, new ApiVersions(), transactionManager,

Review comment:
   nit: make `Metrics` a field

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ 

[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-17 Thread GitBox


showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-694592201


   Merge the latest trunk so that the flaky tests (addressed in 
https://github.com/apache/kafka/pull/9294) can be fixed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9301: KAFKA-10482: fix initialConnectionCount variable race condition issue

2020-09-17 Thread GitBox


showuon opened a new pull request #9301:
URL: https://github.com/apache/kafka/pull/9301


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ouyangnengda closed pull request #9297: KAFKA-10495:Fix spelling mistake

2020-09-17 Thread GitBox


ouyangnengda closed pull request #9297:
URL: https://github.com/apache/kafka/pull/9297


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10495) Fix spelling mistake

2020-09-17 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-10495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198095#comment-17198095
 ] 

欧阳能达 commented on KAFKA-10495:
--

[~showuon] you are right!

> Fix spelling mistake
> 
>
> Key: KAFKA-10495
> URL: https://issues.apache.org/jira/browse/KAFKA-10495
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: 欧阳能达
>Priority: Trivial
>  Labels: newbie
>
> In track branch.
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java
>  465 line: @return true *iff* the operation succeeded
> The *iff* is a mistake word.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10495) Fix spelling mistake

2020-09-17 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-10495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

欧阳能达 resolved KAFKA-10495.
--
Resolution: Won't Fix

> Fix spelling mistake
> 
>
> Key: KAFKA-10495
> URL: https://issues.apache.org/jira/browse/KAFKA-10495
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: 欧阳能达
>Priority: Trivial
>  Labels: newbie
>
> In track branch.
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java
>  465 line: @return true *iff* the operation succeeded
> The *iff* is a mistake word.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


ijuma commented on pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#issuecomment-694647346


   Have we cherry picked this to older branches? It seems like they will be 
broken otherwise.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9294: MINOR: Fix now that kafka.apache.org resolves to 3 IP addresses

2020-09-17 Thread GitBox


ijuma commented on a change in pull request #9294:
URL: https://github.com/apache/kafka/pull/9294#discussion_r490699517



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -273,22 +273,21 @@ public void testMultipleIPsWithDefault() throws 
UnknownHostException {
 
 @Test
 public void testMultipleIPsWithUseAll() throws UnknownHostException {
-assertEquals(2, ClientUtils.resolve(hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS).size());
+assertTrue(ClientUtils.resolve(hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
 
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr1 = connectionStates.currentAddress(nodeId1);
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr2 = connectionStates.currentAddress(nodeId1);
 assertNotSame(addr1, addr2);
-
 connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, 
ClientDnsLookup.USE_ALL_DNS_IPS);
 InetAddress addr3 = connectionStates.currentAddress(nodeId1);
-assertSame(addr1, addr3);

Review comment:
   The test is verifying the round robin that _our_ code does, not the DNS 
server. We should ensure we still have that coverage in the meantime.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9218: MINOR: Fix shouldNotResetEpochHistoryHeadIfUndefinedPassed

2020-09-17 Thread GitBox


ijuma commented on pull request #9218:
URL: https://github.com/apache/kafka/pull/9218#issuecomment-694648140


   Can you please rebase so that we get the new CI running?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8725: KAFKA-9608: Transaction Event Simulation Test

2020-09-17 Thread GitBox


abbccdda commented on a change in pull request #8725:
URL: https://github.com/apache/kafka/pull/8725#discussion_r490713327



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionSimulationCoordinator.java
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
+import org.apache.kafka.common.message.EndTxnResponseData;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.message.InitProducerIdResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A reduced functionality of a combination of transaction coordinator and 
group coordinator.
+ * It provides basic event handling from {@link Sender} with transaction 
turned on.
+ *
+ * Random fault injection is supported as well, which will return a retriable 
error to
+ * the client.
+ */
+class TransactionSimulationCoordinator {
+
+private final Map> pendingPartitionData;
+private final Map pendingOffsets;
+private boolean offsetsAddedToTxn = false;
+
+private long currentProducerId = 0L;
+private short currentEpoch = 0;
+private Random faultInjectRandom = new Random();
+
+public Map> persistentPartitionData() {
+return persistentPartitionData;
+}
+
+public Map committedOffsets() {
+return committedOffsets;
+}
+
+private final Map> persistentPartitionData;
+private final Map committedOffsets;
+
+private final MockClient networkClient;
+private final int throttleTimeMs = 10;
+
+TransactionSimulationCoordinator(MockClient networkClient) {
+this.networkClient = networkClient;
+this.pendingPartitionData = new HashMap<>();
+this.pendingOffsets = new HashMap<>();
+this.persistentPartitionData = new HashMap<>();
+this.committedOffsets = new HashMap<>();
+}
+
+void runOnce(boolean dropMessage) {

Review comment:
   so you are suggesting `maybeDisconnect`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett opened a new pull request #9302: KAFKA-10149: Allow auto preferred leader election when partitions are reassigning

2020-09-17 Thread GitBox


bob-barrett opened a new pull request #9302:
URL: https://github.com/apache/kafka/pull/9302


   This patch removes the check for reassigning partitions when determining 
whether to trigger automatic leader election. This check can cause problems 
during long-running reassignments because a crashed broker can leave the 
partition leaderships in an unexpectedly unbalanced state. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-17 Thread GitBox


showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-694684653


   `tests/Build/JDK 15`: pass
   `tests/Build/JDK 8`: pass
   `tests/Build/JDK 11`: failed 1 test case which is not related to my change.
   
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota
 failed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10482) Fix flaky testDynamicListenerConnectionCreationRateQuota

2020-09-17 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17198153#comment-17198153
 ] 

Luke Chen commented on KAFKA-10482:
---

https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk15/runs/69/log/?start=0

> Fix flaky testDynamicListenerConnectionCreationRateQuota
> 
>
> Key: KAFKA-10482
> URL: https://issues.apache.org/jira/browse/KAFKA-10482
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk11/runs/64/log/?start=0]
>  
> [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-trunk-jdk15/runs/63/log/?start=0]
> kafka.network.DynamicConnectionQuotaTest > 
> testDynamicListenerConnectionCreationRateQuota FAILED
>  java.util.concurrent.ExecutionException: 
> org.scalatest.exceptions.TestFailedException: Connections not closed (initial 
> = 2 current = 1)
>  at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>  at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
>  at 
> kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota(DynamicConnectionQuotaTest.scala:219)
> Caused by:
>  org.scalatest.exceptions.TestFailedException: Connections not closed 
> (initial = 2 current = 1)
>  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>  at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>  at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>  at org.scalatest.Assertions.fail(Assertions.scala:1091)
>  at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>  at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>  at 
> kafka.network.DynamicConnectionQuotaTest.verifyConnectionRate(DynamicConnectionQuotaTest.scala:349)
>  at 
> kafka.network.DynamicConnectionQuotaTest.$anonfun$testDynamicListenerConnectionCreationRateQuota$5(DynamicConnectionQuotaTest.scala:217)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-17 Thread GitBox


showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-694693926


   @ijuma , test result is listed in the previous comment. I think it's good to 
merge. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org