[GitHub] [kafka] Fleshgrinder edited a comment on pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials

2020-10-05 Thread GitBox


Fleshgrinder edited a comment on pull request #9374:
URL: https://github.com/apache/kafka/pull/9374#issuecomment-704057098


   @chia7712 the server gets null right now, and the code you propose is more 
or less what I had originally. However, at least @rondagostino was directly 
confused by the ternary and precedence that's why we rewrote it to be easier to 
understand and easier to debug (now it's possible to set individual 
breakpoints). Modern languages like Kotlin and Rust have no ternary for a good 
reason. 😉 Imho it's better as is, despite being longer, especially because 
length doesn't translate directly to complex.
   
   We'd also still be collecting `null` usernames. The field is nullable in the 
`UserName` class but collecting them is still a bad idea.
   
   I need to check the server side.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Fleshgrinder commented on pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials

2020-10-05 Thread GitBox


Fleshgrinder commented on pull request #9374:
URL: https://github.com/apache/kafka/pull/9374#issuecomment-704057098


   @chia7712 the server gets null right now, and the code you propose is more 
or less what I had originally. However, at least @rondagostino was directly 
confused by the ternary and precedence that's why we rewrote it to be easier to 
understand and easier to debug (now it's possible to set individual 
breakpoints). Modern languages like Kotlin and Rust have no ternary for a good 
reason. 😉 Imho it's better as is, despite being longer, especially because 
length doesn't translate directly to complex.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


lkokhreidze commented on a change in pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#discussion_r500023844



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+@Category({IntegrationTest.class})
+public class StreamTableJoinTopologyOptimizationIntegrationTest {
+private static final int NUM_BROKERS = 1;
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+private String tableTopic;
+private String inputTopic;
+private String outputTopic;
+private String applicationId;
+
+private Properties streamsConfiguration;
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameterized.Parameter
+public String topologyOptimization;
+
+@Parameterized.Parameters(name = "Optimization = {0}")
+public static Collection topologyOptimization() {
+return Arrays.asList(new String[][]{
+{StreamsConfig.OPTIMIZE},
+{StreamsConfig.NO_OPTIMIZATION}
+});
+}
+
+@Before
+public void before() throws InterruptedException {
+streamsConfiguration = new Properties();
+
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+
+tableTopic = "table-topic" + safeTestName;
+inputTopic = "stream-topic-" + safeTestName;
+outputTopic = "output-topic-" + safeTestName;
+applicationId = "app-" + safeTestName;
+
+CLUSTER.createTopic(inputTopic, 4, 1);
+CLUSTER.createTopic(tableTopic, 2, 1);
+CLUSTER.createTopic(outputTopic, 4, 1);
+
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BY

[GitHub] [kafka] chia7712 commented on pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials

2020-10-05 Thread GitBox


chia7712 commented on pull request #9374:
URL: https://github.com/apache/kafka/pull/9374#issuecomment-704046977


   Nice finding!
   
   Is there also a potential NPE in server-side? According to the protocol 
schema 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DescribeUserScramCredentialsRequest.json#L23),
 the field ```users``` is nullable. Does server have to handle null as 
well?(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L3068).
   
   For another, if server is able to handle null, it should be fine to set null 
to ```users```. Hence, we can simplify client code as following style.
   
   ```java
   @Override
   public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
   return new DescribeUserScramCredentialsRequest.Builder(
   new 
DescribeUserScramCredentialsRequestData().setUsers(users == null ? null : 
users.stream().map(user ->
   new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
   }
   ```
   
   please let me know what you think :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8318) Session Window Aggregations generate an extra tombstone

2020-10-05 Thread Ilia Pasynkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208497#comment-17208497
 ] 

Ilia Pasynkov commented on KAFKA-8318:
--

Hello, [~vvcephei].

Right now I'm looking at SessionWindowedKStreamImpl and SuppressScenarioTest 
classes. Could you please help me to figure out, what "tombstone" is exactly 
meant by this issue?

> Session Window Aggregations generate an extra tombstone
> ---
>
> Key: KAFKA-8318
> URL: https://issues.apache.org/jira/browse/KAFKA-8318
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Ilia Pasynkov
>Priority: Minor
>  Labels: newbie++
>
> See the discussion 
> https://github.com/apache/kafka/pull/6654#discussion_r280231439
> The session merging logic generates a tombstone in addition to an update when 
> the session window already exists. It's not a correctness issue, just a small 
> performance hit, because that tombstone is immediately invalidated by the 
> update.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8988: KAFKA-10199: Separate restore threads

2020-10-05 Thread GitBox


guozhangwang commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r51027



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -659,13 +665,12 @@ void runOnce() {
 }
 }
 
-// we can always let changelog reader try restoring in order to 
initialize the changelogs;
-// if there's no active restoring or standby updating it would not try 
to fetch any data
-changelogReader.restore();
-
-// TODO: we should record the restore latency and its relative time 
spent ratio after
-//   we figure out how to move this method out of the stream thread
-advanceNowAndComputeLatency();
+// check if restore thread has encountered TaskCorrupted exception; if 
yes
+// rethrow it to trigger the handling logic
+final TaskCorruptedException e = 
restoreThread.nextCorruptedException();

Review comment:
   Updating the fields of TaskCorruptedException could be risky since it 
can be read by the other main thread concurrently. I think a better way would 
be still keeping its field as immutable, but drain all the exceptions (which is 
thread-safe) and then create a new one aggregating its tasks.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -115,13 +114,16 @@ public boolean isClosed() {
 @Override
 public void revive() {
 if (state == CLOSED) {
+// clear all the stores since they should be re-registered

Review comment:
   `ProcessorStateManager#changelogPartitions` relies on its 
`changelogOffsets` which relies on the `stores` map. If `stores` map gets 
cleared, then `changelogPartitions` would return nothing. So in order to get 
its changelog partitions to send to the restore thread, we need to get them 
first before clearing the `stores` map, i.e. we need to "materialize" that 
`changelogPartitions` map first --- maybe we did not use the right term here, 
sorry.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -623,16 +613,46 @@ void runOnce() {
 return;
 }
 
-initializeAndRestorePhase();
+// we need to first add closed tasks and then created tasks to work 
with those revived / recycled tasks
+restoreThread.addClosedTasks(taskManager.drainRemovedTasks());
+
+// try to initialize created tasks that are either newly assigned or 
re-created from corrupted tasks
+final List initializedTasks;
+if (!(initializedTasks = 
taskManager.tryInitializeNewTasks()).isEmpty()) {
+if (log.isDebugEnabled()) {
+log.debug("Initializing newly created tasks {} under state {}",
+
initializedTasks.stream().map(AbstractTask::id).collect(Collectors.toList()), 
state);
+}
+

Review comment:
   Ack.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atom

[jira] [Updated] (KAFKA-10577) StreamThread should be able to process any processible tasks regardless of its state

2020-10-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10577:

Component/s: streams

> StreamThread should be able to process any processible tasks regardless of 
> its state
> 
>
> Key: KAFKA-10577
> URL: https://issues.apache.org/jira/browse/KAFKA-10577
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> After KAFKA-10199 is done, we should allow active tasks processing even if we 
> are not yet in RUNNING. More generally speaking, we would no longer rely on 
> the thread's RUNNING state to start processing any tasks, but would just 
> always process any processible tasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10555) Improve client state machine

2020-10-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208492#comment-17208492
 ] 

Matthias J. Sax commented on KAFKA-10555:
-

{quote}I thought we were only considering to transit to ERROR if the last 
thread died, but to transit to NOT_RUNNING if the last thread was removed by 
the user. This seems consistent with the current behavior and maintains the 
same semantic meaning of the ERROR state, imo.
{quote}
This would be the state after the KIP (without addressing this ticket).

Transiting to NOT_RUNNING might be an option, but it would also be a change to 
the state machine, as currently, NOT_RUNNING is a terminal state after the 
client was closed. This, it won't be possible to add new thread when in 
NOT_RUNNING state following the current proposal of the KIP.

However, I don't agree that it make sense to go ERROR state when the last 
thread dies _and_ to disallow adding new thread when in ERROR state. IMHO, 
there are two options:
 # go to ERROR state when any thread dies and disallow to add/remove threads 
for this case (as if a thread dies, something went wrong and we want to "lock" 
the client).
 # go to ERROR state only when the last thread dies, but allow to add new 
threads and thus allow to transit from ERROR back to RUNNING (via REBALANCING 
of course); for this case, ERROR means that we stopped processing due to an 
error; for this semantic interpretation of ERROR state, there is no reason to 
not allow adding new threads IMHO (in contrast to (1) for which we say, 
something bad happens we want to lock the client as we think it's unsafe to 
add/remove threads any longer).

I personally prefer (2) over (1), as I don't think that there is a good reason 
to lock down the client after a thread dies (also not, after the last thread 
died). Also note, even if we stay in RUNNING state with zero threads, it might 
be ok, as users can consult `localThreadMetadata` and/or the `num-thread-alive` 
metric to inspect if there are any running thread. Ie, stopping the last 
running thread via `removeThread()` could be the same as if the last thread 
just died.

> Improve client state machine
> 
>
> Key: KAFKA-10555
> URL: https://issues.apache.org/jira/browse/KAFKA-10555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The KafkaStreams client exposes its state to the user for monitoring purpose 
> (ie, RUNNING, REBALANCING etc). The state of the client depends on the 
> state(s) of the internal StreamThreads that have their own states.
> Furthermore, the client state has impact on what the user can do with the 
> client. For example, active task can only be queried in RUNNING state and 
> similar.
> With KIP-671 and KIP-663 we improved error handling capabilities and allow to 
> add/remove stream thread dynamically. We allow adding/removing threads only 
> in RUNNING and REBALANCING state. This puts us in a "weird" position, because 
> if we enter ERROR state (ie, if the last thread dies), we cannot add new 
> threads and longer. However, if we have multiple threads and one dies, we 
> don't enter ERROR state and do allow to recover the thread.
> Before the KIPs the definition of ERROR state was clear, however, with both 
> KIPs it seem that we should revisit its semantics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10577) StreamThread should be able to process any processible tasks regardless of its state

2020-10-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10577:
-

 Summary: StreamThread should be able to process any processible 
tasks regardless of its state
 Key: KAFKA-10577
 URL: https://issues.apache.org/jira/browse/KAFKA-10577
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


After KAFKA-10199 is done, we should allow active tasks processing even if we 
are not yet in RUNNING. More generally speaking, we would no longer rely on the 
thread's RUNNING state to start processing any tasks, but would just always 
process any processible tasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mikebin commented on pull request #9373: KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores

2020-10-05 Thread GitBox


mikebin commented on pull request #9373:
URL: https://github.com/apache/kafka/pull/9373#issuecomment-704032434


   Thanks for the review @ableegoldman! Added a unit test. And thanks @vvcephei 
for making it easier to cherry pick this fix back to 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10576) Different behavior of commitSync and commitAsync

2020-10-05 Thread Yuriy Badalyantc (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy Badalyantc updated KAFKA-10576:
-
Description: 
It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a 
different semantic.
{code:java}
public class TestKafka {
public static void main(String[]args) {
String id = "dev_test";
Map settings = new HashMap<>();
settings.put("bootstrap.servers", "localhost:9094");
settings.put("key.deserializer", StringDeserializer.class);
settings.put("value.deserializer", StringDeserializer.class);
settings.put("client.id", id);
settings.put("group.id", id);

String topic = "test";
Map offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));

try (KafkaConsumer consumer = new 
KafkaConsumer<>(settings)) {
consumer.commitSync(offsets);
}
}
}
{code}
In the example above I created a consumer and use {{commitSync}} to commit 
offsets. This code works as expected — all offsets are committed to kafka.

But in the case of {{commitAsync}} it will not work:
{code:java}
try (KafkaConsumer consumer = new KafkaConsumer<>(settings)) {
CompletableFuture result = new CompletableFuture<>();
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, 
Exception exception) {
if (exception != null) {
result.completeExceptionally(exception);
} else {
result.complete(true);
}
}
});
result.get(15L, TimeUnit.SECONDS);
}
{code}
The {{result}} future failed with a timeout.

This behavior is pretty surprising. From naming and documentation, it looks 
like {{commitSync}} and {{commitAsync}} methods should behave identically. Of 
course, besides the blocking/non-blocking aspect. But in reality, there are 
some differences.

I can assume that the {{commitAsync}} method somehow depends on the {{poll}} 
calls. But I didn't find any explicit information about it in 
{{KafkaConsumer}}'s javadoc or kafka documentation page.

So, I believe that there are the next options:
 # It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} 
should have identical semantics.
 # It's expected, but not well-documented behavior. In that case, this behavior 
should be explicitly documented.

  was:
It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a 
different semantic.
{code:java}
public class TestKafka {
public static void main(String[]args) {
String id = "dev_test";
Map settings = new HashMap<>();
settings.put("bootstrap.servers", "localhost:9094");
settings.put("key.deserializer", StringDeserializer.class);
settings.put("value.deserializer", StringDeserializer.class);
settings.put("client.id", id);
settings.put("group.id", id);

String topic = "test";
Map offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));

try (KafkaConsumer consumer = new 
KafkaConsumer<>(settings)) {
consumer.commitSync(offsets);
}
}
}
{code}
In the example above I created a consumer and use {{commitSync}} to commit 
offsets. This code works as expected — all offsets are committed to kafka.

But in the case of {{commitAsync}} it will not work:
{code:java}
try (KafkaConsumer consumer = new KafkaConsumer<>(settings)) {
CompletableFuture result = new CompletableFuture<>();
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, 
Exception exception) {
if (exception != null) {
result.completeExceptionally(exception);
} else {
result.complete(true);
}
}
});
result.get(15L, TimeUnit.SECONDS);
}
{code}
The {{result}} future failed with a timeout.

This behavior is pretty surprising. From naming and documentation, it looks 
like {{commitSync}} and {{commitAsync}} methods should behave identically. Of 
course, besides the blocking/non-blocking aspect. But in reality, there are 
some differences.

I can assume that the {{commitAsync}} method somehow depends on the {{poll}} 
calls. But I didn't find any explicit information about it in 
{{KafkaConsumer}}'s javadoc or kafka documentation page.

So, I believe that there are the next options:
# It's a but and not expected behavior. {{commitSync}} and {{commitAsync}} 
should have identical semantics.
# It's expected, but not well-documented behavior. In that case, this behavior 
should be explicitly documented.


> Different behavior of commitSync and commitAsync
> 
>
> Key: KAFKA-10576
> URL: h

[jira] [Created] (KAFKA-10576) Different behavior of commitSync and commitAsync

2020-10-05 Thread Yuriy Badalyantc (Jira)
Yuriy Badalyantc created KAFKA-10576:


 Summary: Different behavior of commitSync and commitAsync
 Key: KAFKA-10576
 URL: https://issues.apache.org/jira/browse/KAFKA-10576
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Yuriy Badalyantc


It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a 
different semantic.
{code:java}
public class TestKafka {
public static void main(String[]args) {
String id = "dev_test";
Map settings = new HashMap<>();
settings.put("bootstrap.servers", "localhost:9094");
settings.put("key.deserializer", StringDeserializer.class);
settings.put("value.deserializer", StringDeserializer.class);
settings.put("client.id", id);
settings.put("group.id", id);

String topic = "test";
Map offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));

try (KafkaConsumer consumer = new 
KafkaConsumer<>(settings)) {
consumer.commitSync(offsets);
}
}
}
{code}
In the example above I created a consumer and use {{commitSync}} to commit 
offsets. This code works as expected — all offsets are committed to kafka.

But in the case of {{commitAsync}} it will not work:
{code:java}
try (KafkaConsumer consumer = new KafkaConsumer<>(settings)) {
CompletableFuture result = new CompletableFuture<>();
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, 
Exception exception) {
if (exception != null) {
result.completeExceptionally(exception);
} else {
result.complete(true);
}
}
});
result.get(15L, TimeUnit.SECONDS);
}
{code}
The {{result}} future failed with a timeout.

This behavior is pretty surprising. From naming and documentation, it looks 
like {{commitSync}} and {{commitAsync}} methods should behave identically. Of 
course, besides the blocking/non-blocking aspect. But in reality, there are 
some differences.

I can assume that the {{commitAsync}} method somehow depends on the {{poll}} 
calls. But I didn't find any explicit information about it in 
{{KafkaConsumer}}'s javadoc or kafka documentation page.

So, I believe that there are the next options:
# It's a but and not expected behavior. {{commitSync}} and {{commitAsync}} 
should have identical semantics.
# It's expected, but not well-documented behavior. In that case, this behavior 
should be explicitly documented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

2020-10-05 Thread GitBox


ableegoldman commented on a change in pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#discussion_r46879



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##
@@ -359,7 +431,11 @@ private void getNextSegmentIterator() {
 setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
 
 current.close();
-current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+if (forward) {
+current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+} else {
+current = context.cache().reverseRange(cacheName, 
cacheKeyFrom, cacheKeyTo);
+}
 }
 
 private void setCacheKeyRange(final long lowerRangeEndTime, final long 
upperRangeEndTime) {

Review comment:
   Can you fix the `keyFrom == keyTo` to use `.equals` on the side (down on 
line 370)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
##
@@ -359,7 +431,11 @@ private void getNextSegmentIterator() {
 setCacheKeyRange(currentSegmentBeginTime(), 
currentSegmentLastTime());
 
 current.close();
-current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+if (forward) {

Review comment:
   I think we're going to need some additional changes in this class 
similar to what we had in CachingWindowStore. Definitely at least in 
`getNextSegmentIterator()`. Let's make sure to have some cross-segment test 
coverage here as well, especially because the iteration logic of session store 
range queries is the hardest to wrap your head around out of all the stores (at 
least, it is for me)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##
@@ -201,7 +247,26 @@ public void remove(final Windowed sessionKey) {
 
 removeExpiredSegments();
 
-return registerNewIterator(key, key, Long.MAX_VALUE, 
endTimeMap.entrySet().iterator());
+return registerNewIterator(
+key,
+key,
+Long.MAX_VALUE, endTimeMap.entrySet().iterator(),

Review comment:
   missing newline

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##
@@ -382,9 +478,20 @@ private boolean setInnerIterators() {
 currentKey = nextKeyEntry.getKey();
 
 if (latestSessionStartTime == Long.MAX_VALUE) {
-recordIterator = 
nextKeyEntry.getValue().entrySet().iterator();
+final Set> entries;
+if (forward) entries = 
nextKeyEntry.getValue().descendingMap().entrySet();
+else entries = nextKeyEntry.getValue().entrySet();
+recordIterator = entries.iterator();
 } else {
-recordIterator = 
nextKeyEntry.getValue().headMap(latestSessionStartTime, 
true).entrySet().iterator();
+final Set> entries;
+if (forward) entries = nextKeyEntry.getValue()

Review comment:
   If/else needs brackets





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2020-10-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10575:
-

 Summary: StateRestoreListener#onRestoreEnd should always be 
triggered
 Key: KAFKA-10575
 URL: https://issues.apache.org/jira/browse/KAFKA-10575
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete the 
restoration of an active task and transit it to the running state. However the 
restoration can also be stopped when the restoring task gets closed (because it 
gets migrated to another client, for example). We should also trigger the 
callback indicating its progress when the restoration stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9373: KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores

2020-10-05 Thread GitBox


ableegoldman commented on pull request #9373:
URL: https://github.com/apache/kafka/pull/9373#issuecomment-704010744


   By the way, John was so kind as to cherrypick the trunk PR that rearranged 
things in this method back to the 2.6 branch -- so we should be able to 
cherrypick this PR smoothly, no need for a separate one for 2.6
   
   Looks like just a handful of unrelated flaky test failures in the builds, 
hopefully we'll have better luck on the next run
   ``
   Build / JDK 11 / kafka.api.MetricsTest.testMetrics
   Build / JDK 8 / 
kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault
   Build / JDK 15 / 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   Build / JDK 15 / 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
   Build / JDK 15 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   ``
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #9373: KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores

2020-10-05 Thread GitBox


ableegoldman edited a comment on pull request #9373:
URL: https://github.com/apache/kafka/pull/9373#issuecomment-704010744


   By the way, John was so kind as to cherrypick the trunk PR that rearranged 
things in this method back to the 2.6 branch -- so we should be able to 
cherrypick this PR smoothly, no need for a separate one for 2.6
   
   Looks like just a handful of unrelated flaky test failures in the builds, 
hopefully we'll have better luck on the next run
   ```
   Build / JDK 11 / kafka.api.MetricsTest.testMetrics
   Build / JDK 8 / 
kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault
   Build / JDK 15 / 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   Build / JDK 15 / 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
   Build / JDK 15 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8988: KAFKA-10199: Separate restore threads

2020-10-05 Thread GitBox


guozhangwang commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r499988582



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active 
and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+private final Time time;
+private final Logger log;
+private final ChangelogReader changelogReader;
+private final AtomicBoolean isRunning = new AtomicBoolean(true);
+private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+private final LinkedBlockingDeque taskItemQueue;
+private final AtomicReference> completedChangelogs;
+private final LinkedBlockingDeque 
corruptedExceptions;
+
+public boolean isRunning() {
+return isRunning.get();
+}
+
+public StateRestoreThread(final Time time,
+  final StreamsConfig config,
+  final String threadClientId,
+  final Admin adminClient,
+  final String groupId,
+  final Consumer restoreConsumer,
+  final StateRestoreListener 
userStateRestoreListener) {
+this(time, threadClientId, new StoreChangelogReader(time, config, 
threadClientId,
+adminClient, groupId, restoreConsumer, 
userStateRestoreListener));
+}
+
+// for testing only
+public StateRestoreThread(final Time time,
+  final String threadClientId,
+  final ChangelogReader changelogReader) {
+super(threadClientId);
+
+final String logPrefix = String.format("state-restore-thread [%s] ", 
threadClientId);
+final LogContext logContext = new LogContext(logPrefix);
+
+this.time = time;
+this.log = logContext.logger(getClass());
+this.taskItemQueue = new LinkedBlockingDeque<>();
+this.corruptedExceptions = new LinkedBlockingDeque<>();
+this.completedChangelogs = new 
AtomicReference<>(Collections.emptySet());
+
+this.changelogReader = changelogReader;
+}
+
+private synchronized void waitIfAllChangelogsCompleted() {
+final Set allChangelogs = 
changelogReader.allChangelogs();
+if (allChangelogs.equals(changelogReader.completedChangelogs())) {
+log.debug("All changelogs {} have completed restoration so far, 
will wait " +
+"until new changelogs are registered", allChangelogs);
+
+while (isRunning.get() && taskItemQueue.isEmpty()) {
+try {
+wait();
+} catch (final InterruptedException e) {
+// do nothing
+}
+}
+}
+}
+
+public synchronized void addInitializedTasks(final List 
tasks) {
+if (!tasks.isEmpty()) {
+for (fi

[GitHub] [kafka] vvcephei commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r499812565



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -205,4 +207,9 @@ default boolean maybePunctuateSystemTime() {
 return false;
 }
 
+void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
+ final TimeoutException timeoutException,
+ final Logger log) throws StreamsException;

Review comment:
   StreamsException is unchecked, right?
   
   It's better to document unchecked exceptions in the `@throws` javadoc tag. 
The `throws` keyword is for telling the compiler that you want callers instead 
of yourself to handle a _checked_ exception. I honestly have no idea why the 
java team chose to say "this is poor style" instead of just making it a 
compiler error, but that's the rationale. 
https://www.oracle.com/technical-resources/articles/java/javadoc-tool.html#throwstag

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -431,6 +432,9 @@ public void restore() {
 // in order to make sure we call the main consumer#poll in 
time.
 // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
 polledRecords = restoreConsumer.poll(state == 
ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
+
+// TODO (?) If we cannot fetch records during restore, should 
we trigger `task.timeout.ms` ?
+// TODO (?) If we cannot fetch records for standby task, 
should we trigger `task.timeout.ms` ?

Review comment:
   Thanks. I agree we can afford to leave this for future improvements.
   
   It does seem like we should have some kind of improvement in the future, 
though. Having a restore or standby-update fail indefinitely would be just as 
damaging to an application's robustness as having the main consumer fail 
indefinitely.
   
   Perhaps we can make some improvements to the Consumer first, though, so that 
we don't have to do so much guesswork to distinguish between "no records" and 
"no fetch".

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -137,4 +146,48 @@ public void update(final Set 
topicPartitions, final Map deadlineMs) {
+final String errorMessage = String.format(
+"Task %s did not make progress within %d ms. Adjust `%s` if 
needed.",
+id,
+currentWallClockMs - deadlineMs + taskTimeoutMs,
+StreamsConfig.TASK_TIMEOUT_MS_CONFIG
+);
+
+if (timeoutException != null) {
+throw new TimeoutException(errorMessage, timeoutException);
+} else {
+throw new TimeoutException(errorMessage);
+}
+}
+
+if (timeoutException != null) {
+log.debug(
+"Timeout exception. Remaining time to deadline {}; retrying.",
+deadlineMs - currentWallClockMs,
+timeoutException
+);

Review comment:
   This is the wrong format for this log message. The exception won't be 
logged. You have to format the string first:
   ```suggestion
   log.debug(
   String.format("Timeout exception. Remaining time to deadline 
%d; retrying.",
   deadlineMs - currentWallClockMs),
   timeoutException
   );
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -137,4 +146,48 @@ public void update(final Set 
topicPartitions, final Map

[jira] [Resolved] (KAFKA-10439) Connect's Values class loses precision for integers, larger than 64 bits

2020-10-05 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-10439.

Resolution: Fixed

Merged and back ported to the branches listed under "Fix version"

> Connect's Values class loses precision for integers, larger than 64 bits
> 
>
> Key: KAFKA-10439
> URL: https://issues.apache.org/jira/browse/KAFKA-10439
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.2, 2.7.0, 2.5.2, 2.6.1
>
>
> The `org.apache.kafka.connect.data.Values#parse` method parses integers, 
> which are larger than `Long.MAX_VALUE` as `double` with `
> Schema.FLOAT64_SCHEMA`.
>  
> That means it loses precision for these larger integers.
> For example:
> {code:java}
> SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");
> {code}
> returns:
> {code:java}
> SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10439) Connect's Values class loses precision for integers, larger than 64 bits

2020-10-05 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-10439:
---
Fix Version/s: 2.6.1
   2.5.2
   2.4.2
   2.3.2
   2.2.3
   2.1.2
   2.0.2

> Connect's Values class loses precision for integers, larger than 64 bits
> 
>
> Key: KAFKA-10439
> URL: https://issues.apache.org/jira/browse/KAFKA-10439
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.2, 2.7.0, 2.5.2, 2.6.1
>
>
> The `org.apache.kafka.connect.data.Values#parse` method parses integers, 
> which are larger than `Long.MAX_VALUE` as `double` with `
> Schema.FLOAT64_SCHEMA`.
>  
> That means it loses precision for these larger integers.
> For example:
> {code:java}
> SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");
> {code}
> returns:
> {code:java}
> SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine opened a new pull request #9379: MINOR: Annotate test BlockingConnectorTest as integration test

2020-10-05 Thread GitBox


kkonstantine opened a new pull request #9379:
URL: https://github.com/apache/kafka/pull/9379


   Currently `BlockingConnectorTest` is incorrectly runs as a unit test. 
   Categorize this test correctly as integration test by adding the appropriate 
annotation
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #9378: MINOR: ACLs for secured cluster system tests

2020-10-05 Thread GitBox


rondagostino opened a new pull request #9378:
URL: https://github.com/apache/kafka/pull/9378


   This PR adds missing broker ACLs required to create topics and SCRAM 
credentials when ACLs are enabled for a system test.   These ACLs were missed 
for system tests in the PR for KAFKA-10131 
(https://github.com/apache/kafka/pull/9274/).  This PR also adds support for 
using PLAINTEXT as the inter broker security protocol when using SCRAM from the 
client in a system test with a secured cluster-- without this it would always 
be necessary to set both the inter-broker and client mechanisms to a SCRAM 
mechanism.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-05 Thread GitBox


gardnervickers commented on pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#issuecomment-703966954


   @junrao I was able to get some more time to pick this back up, apologies for 
the long gap in time since my last update.
   
   In the most recent commit, I've moved away from tracking producer state 
snapshot files on the segment and instead maintain an in-memory cache using the 
ProducerStateManager.
   
   There were a few reasons for doing this. Firstly, it became a bit awkward 
trying to track the producer state snapshot file which we emit during clean 
shutdown since it does not really have an associated segment file. Second, the 
producer state truncation/restore logic would need to be moved into the `Log`, 
since complete view of all producer state snapshot files are required. This is 
further complicated by the way we handle corrupt snapshot files. Lastly, 
because we use a "fake" `ProducerStateManager` during segment recovery, we'd 
have to duplicate a lot of the same logic as exists today to handle 
truncation/loading outside of the segment lifecycle.
   
   Instead, I opted to take an approach similar to the way the `Log` manages 
segments, by having a `ConcurrentNavigableMap` which tracks snapshot files in 
memory. As a result, the logic for truncation and restore largely remains the 
same, but instead of scanning the log directory on every operation we query the 
in-memory map instead. Deletions are handled in the same way that segment 
deletions are handled, where the snapshot file is deleted asynchronously along 
with the segment file. Because we scan the logdir at startup for "stray" 
snapshot files, it's unnecessary to rename the snapshot files pending deletion 
with the `.deleted` suffix.
   
   This approach has two downsides which I think are relatively minor.
   
   1. When a broker shuts down cleanly and emits a snapshot file, the emitted 
snapshot file is considered "stray" on the next broker startup. While we will 
clean all "stray" snapshot files except the most recent, we still keep the most 
recent snapshot file around until the next broker restart. This will result in 
a single "stray" snapshot file remaining until the next broker restart, at 
which point the "stray" snapshot file will be deleted.
   2. Because we construct a temporary `ProducerStateManager` during segment 
recovery, and it may delete/create some snapshot files, we need to re-scan the 
log directory for snapshot files after segment loading is completed but before 
we load producer state. This is to ensure that the in-memory mapping for the 
"real" `ProducerStateManager` is up to date.
   
   Snapshot file deletion is triggered via `Log.deleteSegmentFiles` when 
deletion occurs due to retention. When swapping log segments into the log (like 
with compaction), it appears we have the additional limitation that we don't 
want to delete snapshot files purely based on the base offset of the deleted 
segment file. To handle this case, we check to see if the segment file which is 
being deleted due to the swap has a counterpart new segment which is being 
swapped in, if it does, we do not delete the snapshot file.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #9320: KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale.

2020-10-05 Thread GitBox


kkonstantine merged pull request #9320:
URL: https://github.com/apache/kafka/pull/9320


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-10-05 Thread GitBox


hachikuji commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r499939576



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##
@@ -2358,6 +2359,41 @@ public void testForceShutdownWithIncompleteTransaction() 
{
 }
 }
 
+@Test
+public void testTransactionAbortedExceptionOnAbortWithoutError() throws 
InterruptedException, ExecutionException {
+ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+TransactionManager txnManager = new TransactionManager(logContext, 
"testTransactionAbortedExceptionOnAbortWithoutError", 6, 100, apiVersions, 
false);
+
+setupWithTransactionState(txnManager, false, null);
+doInitTransactions(txnManager, producerIdAndEpoch);
+// Begin the transaction
+txnManager.beginTransaction();
+txnManager.maybeAddPartitionToTransaction(tp0);
+client.prepareResponse(new AddPartitionsToTxnResponse(0, 
Collections.singletonMap(tp0, Errors.NONE)));
+// Run it once so that the partition is added to the transaction.
+sender.runOnce();
+// Append a record to the accumulator.
+FutureRecordMetadata metadata1 = appendToAccumulator(tp0, 
time.milliseconds(), "key1", "value1");
+// Now abort the transaction manually.
+txnManager.beginAbort();
+// Try to send.
+// This should abort the existing transaction and
+// drain all the unsent batches with a TransactionAbortedException.
+sender.runOnce();
+// Now attempt to fetch the result for the record.
+try {
+// This should fail since we aborted the transaction.
+metadata1.get();

Review comment:
   We have a helper for this pattern. See `TestUtils.assertFutureThrows`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package

2020-10-05 Thread GitBox


hachikuji merged pull request #9377:
URL: https://github.com/apache/kafka/pull/9377


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-05 Thread GitBox


bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703947094


   >Do I need to do anything on this?
   
   @badaiaqrandista, nope as soon as we can get a green build, I'll merge it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-05 Thread GitBox


badaiaqrandista commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703939623


   @bbejeck Do I need to do anything on this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-05 Thread GitBox


bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703928924


   Java 11 and Java 15 passed.
   
   Java 8 failed with 
`org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState`
 known flaky test.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2020-10-05 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208393#comment-17208393
 ] 

Bill Bejeck commented on KAFKA-10405:
-

Saw same error again - 
https://github.com/apache/kafka/pull/9099/checks?check_run_id=1210606325

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Priority: Major
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>  
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > 
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout 
> 6. Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms.
> 14:25:19 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


bbejeck commented on pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#issuecomment-703926657


   Note the tests for JDK 11 passed 
(https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9237/12/?cloudbees-analytics-link=scm-reporting%2Fstage%2Fpending)
 but the job isn't exiting.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


bbejeck commented on a change in pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#discussion_r499908753



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+@Category({IntegrationTest.class})
+public class StreamTableJoinTopologyOptimizationIntegrationTest {
+private static final int NUM_BROKERS = 1;
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+private String tableTopic;
+private String inputTopic;
+private String outputTopic;
+private String applicationId;
+
+private Properties streamsConfiguration;
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameterized.Parameter
+public String topologyOptimization;
+
+@Parameterized.Parameters(name = "Optimization = {0}")
+public static Collection topologyOptimization() {
+return Arrays.asList(new String[][]{
+{StreamsConfig.OPTIMIZE},
+{StreamsConfig.NO_OPTIMIZATION}
+});
+}
+
+@Before
+public void before() throws InterruptedException {
+streamsConfiguration = new Properties();
+
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+
+tableTopic = "table-topic" + safeTestName;
+inputTopic = "stream-topic-" + safeTestName;
+outputTopic = "output-topic-" + safeTestName;
+applicationId = "app-" + safeTestName;
+
+CLUSTER.createTopic(inputTopic, 4, 1);
+CLUSTER.createTopic(tableTopic, 2, 1);
+CLUSTER.createTopic(outputTopic, 4, 1);
+
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_

[GitHub] [kafka] guozhangwang merged pull request #9342: MINOR: Update doc for raft state metrics

2020-10-05 Thread GitBox


guozhangwang merged pull request #9342:
URL: https://github.com/apache/kafka/pull/9342


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package

2020-10-05 Thread GitBox


guozhangwang commented on pull request #9377:
URL: https://github.com/apache/kafka/pull/9377#issuecomment-703906935


   LGTM!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7421) Deadlock in Kafka Connect

2020-10-05 Thread Kyle Leiby (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208344#comment-17208344
 ] 

Kyle Leiby commented on KAFKA-7421:
---

Hi all, we've been encountering a similar deadlock (I think the same as the one 
[~xakassi] is seeing). We are running a single Debezium JAR inside a 
{{confluentinc/cp-kafka-connect-base:5.5.1-1-deb8}} container. We tried several 
5.x Debian 8 images, and encounter the deadlocks in all of them.

Here's the relevant portion from an example thread dump:
{code:java}
Found one Java-level deadlock:
=
"StartAndStopExecutor-connect-1-2":
  waiting to lock monitor 0x7f2b68001d58 (object 0xc118c3c8, a 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader),
  which is held by "StartAndStopExecutor-connect-1-1"
"StartAndStopExecutor-connect-1-1":
  waiting to lock monitor 0x7f2b68001eb8 (object 0xc510, a 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader),
  which is held by "StartAndStopExecutor-connect-1-2"

Java stack information for the threads listed above:
===
"StartAndStopExecutor-connect-1-2":
at java.lang.ClassLoader.loadClass(ClassLoader.java:404)
- waiting to lock <0xc118c3c8> (a 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:397)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
- locked <0xc6a9e908> (a java.lang.Object)
at 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
- locked <0xc6a9e908> (a java.lang.Object)
- locked <0xc510> (a 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:719)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:311)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:215)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:209)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:432)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1186)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:127)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1201)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1197)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"StartAndStopExecutor-connect-1-1":
at 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91)
- waiting to lock <0xc510> (a 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:394)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:719)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:311)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:215)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:209)
at 
org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:251)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1229)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:127)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1245)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1241)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(Thre

[GitHub] [kafka] hachikuji opened a new pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package

2020-10-05 Thread GitBox


hachikuji opened a new pull request #9377:
URL: https://github.com/apache/kafka/pull/9377


   To avoid confusion since is only used by `TestRaftServer`, this PR moves 
`RaftRequestHandler` to the `tools` package.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9349: MINOR: add proper checks to KafkaConsumer.groupMetadata

2020-10-05 Thread GitBox


hachikuji merged pull request #9349:
URL: https://github.com/apache/kafka/pull/9349


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2020-10-05 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208316#comment-17208316
 ] 

Sophie Blee-Goldman commented on KAFKA-5998:


[~sandeep.lakdaw...@gmail.com] are you running all three instances on the same 
machine with a shared state directory? And/or using /tmp as the state directory?

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.2.2, 2.4.0, 2.3.1
>
> Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, 
> exc.txt, props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.intern

[GitHub] [kafka] hachikuji opened a new pull request #9376: MINOR: Remove `TargetVoters` from `DescribeQuorum`

2020-10-05 Thread GitBox


hachikuji opened a new pull request #9376:
URL: https://github.com/apache/kafka/pull/9376


   This field is leftover from the early days of the KIP when it covered 
reassignment. Since the API is not exposed yet, should be no harm updating the 
first version.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499871426



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
+ *
+ * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {
+final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, eh);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+for (final StreamThread thread : threads) {
+if (eh != null)  {
+thread.setStreamsUncaughtExceptionHandler(handler);
+} else {
+final StreamsUncaughtExceptionHandler defaultHandler = 
exception ->
+
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD;
+
thread.setStreamsUncaughtExceptionHandler(defaultHandler);
+}
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handleStreamsUncaughtException(final Exception e,
+   
final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+final 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action 
= streamsUncaughtExceptionHandler.handle(e);
+switch (action) {
+case SHUTDOWN_STREAM_THREAD:
+log.error("Encountered the following exception during 
processing " +
+"and the thread is going to shut down: ", e);
+break;
+case REPLACE_STREAM_THREAD:
+log.error("Encountered the following exception during 
processing " +
+"and the the stream thread will be replaced: ", e); 
//TODO: add then remove, wait until 663 is merged
+break;
+case SHUTDOWN_KAFKA_STREAMS_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the client is going to shut down: ", e);
+for (final StreamThread streamThread: threads) {
+streamThread.shutdown();
+}

Review comment:
   I am okay with renaming, but I will wait to for everything else to be 
cleared up to see if it is still necessary.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499871031



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -436,6 +496,8 @@ private void maybeSetError() {
 }
 
 if (setState(State.ERROR)) {
+metrics.close();

Review comment:
   We did have it this way in the kip. If we stick to this for now I think 
that we can clear this up easily when we decide what we want to do with the 
sates in general in when we take care of the discussion in KIP-663 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499868891



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Exception exception);

Review comment:
   That seems to be a fine solution. It will still attempt the shutdown but 
may fail. As long as we warn I guess it will work





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8353: KAFKA-9764: Remove stream simple benchmark suite

2020-10-05 Thread GitBox


vvcephei commented on pull request #8353:
URL: https://github.com/apache/kafka/pull/8353#issuecomment-703874939


   Cherry-picked to 2.4 and 2.3



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik edited a comment on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-703873486


   @junrao Thanks for the review! I've addressed the latest comments in 
e55358fd1a00f12ef98fc4d2d649a297ddf146da . The PR is ready for another pass.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-703873486


   @junrao Thanks for the review! I've addressed the latest comments in 
e55358fd1a00f12ef98fc4d2d649a297ddf146da .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8353: KAFKA-9764: Remove stream simple benchmark suite

2020-10-05 Thread GitBox


vvcephei commented on pull request #8353:
URL: https://github.com/apache/kafka/pull/8353#issuecomment-703869908


   cherry-picked to 2.5



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

2020-10-05 Thread GitBox


vvcephei commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-703867931


   Cherry-picked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9323: KAFKA-10514: Fix unit test for state directory cleanup

2020-10-05 Thread GitBox


vvcephei commented on pull request #9323:
URL: https://github.com/apache/kafka/pull/9323#issuecomment-703867815


   Cherry-picked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499847021



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -715,7 +747,58 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 doAnswer((_: InvocationOnMock) => {
   latch.countDown()
 }).doCallRealMethod().when(spyThread).awaitShutdown()
-controller.shutdown() 
+controller.shutdown()
+  }
+
+  private def testControllerFeatureZNodeSetup(initialZNode: 
Option[FeatureZNode],
+  interBrokerProtocolVersion: 
ApiVersion): Unit = {
+val versionBeforeOpt = initialZNode match {
+  case Some(node) =>
+zkClient.createFeatureZNode(node)
+Some(zkClient.getDataAndVersion(FeatureZNode.path)._2)
+  case None =>
+Option.empty
+}
+servers = makeServers(1, interBrokerProtocolVersion = 
Some(interBrokerProtocolVersion))
+TestUtils.waitUntilControllerElected(zkClient)

Review comment:
   Done. Please take a look at the fix. I've added logic to wait for 
processing on a dummy event just after waiting for controller election. I'm 
hoping this will make sure the controller failover logic is completed before 
the test proceeds further to make assertions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499843780



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -616,7 +616,7 @@ public void 
shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState(
 final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
 streams.start();
 try {
-streams.setUncaughtExceptionHandler(null);
+
streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);

Review comment:
   Ah, gotcha. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499842547



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -436,6 +496,8 @@ private void maybeSetError() {
 }
 
 if (setState(State.ERROR)) {
+metrics.close();

Review comment:
   It certainly might. I'm just wary of how far into the uncanny valley 
we're going here. Streams is going to be put into a state that's very similar 
to the one that `close()` produces, but not identical. What will then happen 
when they _do_ call close? What will happen when we realize that something else 
needs to be done as part of closing the instance (will we even remember that we 
should consider doing it here as well)?
   
   OTOH, we could instead change direction on the "error-vs-shutdown" debase 
and just make all these methods call `close(ZERO)` instead. Then, the _real_ 
close method will be invoked, and Streams will go through a well-known 
transition through `PENDING_SHUTDOWN` to `NOT_RUNNING`.
   
   It would then be a problem for a later date (after KIP-663) if someone 
wanted to request that instead the app should stop all running threads so they 
can manually call "addThread" later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499842547



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -436,6 +496,8 @@ private void maybeSetError() {
 }
 
 if (setState(State.ERROR)) {
+metrics.close();

Review comment:
   It certainly might. I'm just wary of how far into the uncanny valley 
we're going here. Streams is going to be put into a state that's very similar 
to the one that `close()` produces, but not identical. What will then happen 
when they _do_ call close?
   
   OTOH, we could instead change direction on the "error-vs-shutdown" debase 
and just make all these methods call `close(ZERO)` instead. Then, the _real_ 
close method will be invoked, and Streams will go through a well-known 
transition through `PENDING_SHUTDOWN` to `NOT_RUNNING`.
   
   It would then be a problem for a later date (after KIP-663) if someone 
wanted to request that instead the app should stop all running threads so they 
can manually call "addThread" later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499838819



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Exception exception);

Review comment:
   That's a good question. Maybe we could just document that that option is 
an unsuitable response to an Error, and also log an `ERROR` message if you 
select it in response to an Error. It's not _always_ bad to ignore an Error, 
but it usually is. We can leave it to users to decide what they want to do.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499836489



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
+ *
+ * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {

Review comment:
   Oh, sure. Now I know why you picked this name :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10531) KafkaBasedLog can sleep for negative values

2020-10-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10531:
--
Fix Version/s: 2.5.2
   2.7.0

> KafkaBasedLog can sleep for negative values
> ---
>
> Key: KAFKA-10531
> URL: https://issues.apache.org/jira/browse/KAFKA-10531
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
> Fix For: 2.7.0, 2.5.2, 2.6.1
>
>
> {{time.milliseconds}} is not monotonic, so this code can throw :
> {{java.lang.IllegalArgumentException: timeout value is negative}}
>  
> {code:java}
> long started = time.milliseconds();
> while (partitionInfos == null && time.milliseconds() - started < 
> CREATE_TOPIC_TIMEOUT_MS) {
> partitionInfos = consumer.partitionsFor(topic);
> Utils.sleep(Math.min(time.milliseconds() - started, 1000));
> }
> {code}
> We need to check for negative value before sleeping.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-05 Thread GitBox


dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-703838454


   Hi @guozhangwang can you trigger new build, looks like flaky tests?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499816619



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def finalizedFeatures(features: java.util.Map[String, 
org.apache.kafka.clients.admin.FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(features.asScala.map {
+  case(name, versionRange) =>
+(name, new FinalizedVersionRange(versionRange.minVersionLevel(), 
versionRan

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499816373



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def finalizedFeatures(features: java.util.Map[String, 
org.apache.kafka.clients.admin.FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(features.asScala.map {
+  case(name, versionRange) =>
+(name, new FinalizedVersionRange(versionRange.minVersionLevel(), 
versionRan

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499812076



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def finalizedFeatures(features: java.util.Map[String, 
org.apache.kafka.clients.admin.FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(features.asScala.map {
+  case(name, versionRange) =>
+(name, new FinalizedVersionRange(versionRange.minVersionLevel(), 
versionRan

[GitHub] [kafka] rhauch merged pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-10-05 Thread GitBox


rhauch merged pull request #9347:
URL: https://github.com/apache/kafka/pull/9347


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499811752



##
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##
@@ -185,7 +185,7 @@ class BrokerEndPointTest {
   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
   "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},
   "rack":"dc1",
-  "features": {"feature1": {"min_version": 1, "max_version": 2}, 
"feature2": {"min_version": 2, "max_version": 4}}
+  "features": {"feature1": {"min_version": 1, "first_active_version": 1, 
"max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, 
"max_version": 4}}

Review comment:
   Done. Nice catch!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499811265



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent.
+   *- If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *  and empty finalized features.
+   *- Otherwise, if a node already exists in enabled status then the 
controller will just
+   *  flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled.
+   * - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *   

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499810462



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent.
+   *- If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *  and empty finalized features.
+   *- Otherwise, if a node already exists in enabled status then the 
controller will just
+   *  flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled.
+   * - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *   

[jira] [Assigned] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation

2020-10-05 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10559:
---

Assignee: Sagar Rao

> Don't shutdown the entire app upon TimeoutException during internal topic 
> validation
> 
>
> Key: KAFKA-10559
> URL: https://issues.apache.org/jira/browse/KAFKA-10559
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 2.7.0
>
>
> During some of the KIP-572 work, we made things pretty brittle by changing 
> the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` 
> error code and shut down the entire application if a TimeoutException is hit 
> during the internal topic creation/validation.
> Internal topic validation occurs during every rebalance, and we have seen it 
> time out on topic discovery in unstable environments. So shutting down the 
> entire application seems like a step in the wrong direction, and antithetical 
> to the goal of KIP-572 (improving the resiliency of Streams in the face of 
> TimeoutExceptions)
> I'm not totally sure what the previous behavior was, but it seems to me we 
> have three options:
>  # Rethrow the TimeoutException and allow it to kill the thread
>  # Swallow the TimeoutException and retry the rebalance indefinitely
>  # Some combination of the above: swallow the TimeoutException but don't 
> retry indefinitely:
>  ## Start a timer and allow retrying rebalances for up the configured 
> task.timeout.ms, the timeout config introduced in KIP-572
>  ## Retry for some constant number of rebalances
> I think if we go with option 3, then shutting down the entire application is 
> relatively more palatable, as we have given the environment a chance to 
> stabilize.
> But, killing the thread still seems preferable, given the two new features 
> that are coming out soon: the ability to start up new threads, and the 
> improved exception handler that allows the user to choose to shut down the 
> entire application if that's really what they want. Once users have this 
> level of control over the application, we should allow them to decide how 
> they want to handle exceptional cases like this, rather than forcing an 
> option on them (eg shutdown everything) 
>  
> Imo we should fix this before 2.7 comes out, even if it's just a partial fix 
> (eg we do option 1 in 2.7, but plan to implement option 3 eventually)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] piotrrzysko commented on pull request #9371: KAFKA-10510: Validate replication factor consistency on reassignment

2020-10-05 Thread GitBox


piotrrzysko commented on pull request #9371:
URL: https://github.com/apache/kafka/pull/9371#issuecomment-703823226


   Hi @stanislavkozlovski, would you mind taking a look at this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation

2020-10-05 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208257#comment-17208257
 ] 

Sophie Blee-Goldman commented on KAFKA-10559:
-

[~sagarrao] Yeah, go ahead! This should be a pretty small PR so it would be 
great if we could knock it out in the next week or two. Just ping me when it's 
ready.

For the PR itself, I think it sounds reasonable to just rethrow the 
TimeoutException to kill the thread. The "add/recover stream thread" 
functionality will probably slip 2.7, but it'll be implemented soon. So we 
don't really need to go out of our way to save a single thread from death in 
rare circumstances imo

> Don't shutdown the entire app upon TimeoutException during internal topic 
> validation
> 
>
> Key: KAFKA-10559
> URL: https://issues.apache.org/jira/browse/KAFKA-10559
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.7.0
>
>
> During some of the KIP-572 work, we made things pretty brittle by changing 
> the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` 
> error code and shut down the entire application if a TimeoutException is hit 
> during the internal topic creation/validation.
> Internal topic validation occurs during every rebalance, and we have seen it 
> time out on topic discovery in unstable environments. So shutting down the 
> entire application seems like a step in the wrong direction, and antithetical 
> to the goal of KIP-572 (improving the resiliency of Streams in the face of 
> TimeoutExceptions)
> I'm not totally sure what the previous behavior was, but it seems to me we 
> have three options:
>  # Rethrow the TimeoutException and allow it to kill the thread
>  # Swallow the TimeoutException and retry the rebalance indefinitely
>  # Some combination of the above: swallow the TimeoutException but don't 
> retry indefinitely:
>  ## Start a timer and allow retrying rebalances for up the configured 
> task.timeout.ms, the timeout config introduced in KIP-572
>  ## Retry for some constant number of rebalances
> I think if we go with option 3, then shutting down the entire application is 
> relatively more palatable, as we have given the environment a chance to 
> stabilize.
> But, killing the thread still seems preferable, given the two new features 
> that are coming out soon: the ability to start up new threads, and the 
> improved exception handler that allows the user to choose to shut down the 
> entire application if that's really what they want. Once users have this 
> level of control over the application, we should allow them to decide how 
> they want to handle exceptional cases like this, rather than forcing an 
> option on them (eg shutdown everything) 
>  
> Imo we should fix this before 2.7 comes out, even if it's just a partial fix 
> (eg we do option 1 in 2.7, but plan to implement option 3 eventually)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soondenana commented on pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-10-05 Thread GitBox


soondenana commented on pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#issuecomment-703805880


   There was an error when building `streams.examples`:
   
   ```
   [2020-10-05T08:40:05.722Z] [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on 
project standalone-pom: A Maven project already exists in the directory 
/home/jenkins/workspace/Kafka_kafka-pr_PR-9347/streams/quickstart/test-streams-archetype/streams.examples
 -> [Help 1]
   ```
   
   The failure is not related to this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


bbejeck commented on pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#issuecomment-703803731


   @lkokhreidze, thanks for the quick update. I'll make another pass soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499753420



##
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##
@@ -185,7 +185,7 @@ class BrokerEndPointTest {
   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
   "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},
   "rack":"dc1",
-  "features": {"feature1": {"min_version": 1, "max_version": 2}, 
"feature2": {"min_version": 2, "max_version": 4}}
+  "features": {"feature1": {"min_version": 1, "first_active_version": 1, 
"max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, 
"max_version": 4}}

Review comment:
   Should we revert the changes here?

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absen

[jira] [Resolved] (KAFKA-9585) Flaky Test: LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization

2020-10-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9585.
--
Resolution: Cannot Reproduce

> Flaky Test: 
> LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization
> 
>
> Key: KAFKA-9585
> URL: https://issues.apache.org/jira/browse/KAFKA-9585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> Failed for me locally with 
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 12. Should 
> obtain non-empty lag information eventually
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10555) Improve client state machine

2020-10-05 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208231#comment-17208231
 ] 

Sophie Blee-Goldman commented on KAFKA-10555:
-

Just to clarify, I do agree with Matthias that we shouldn't transit to ERROR if 
the last stream thread is removed via the new removeStreamThread() method.

I thought we were only considering to transit to ERROR if the last thread died, 
but to transit to NOT_RUNNING if the last thread was removed by the user. This 
seems consistent with the current behavior and maintains the same semantic 
meaning of the ERROR state, imo. I don't think we can say that "transiting to 
ERROR if the last thread is removed" is following the current behavior, because 
there is no way to remove a thread at the moment. So, we should just do what 
makes the most sense for this case. Personally I think that would be to transit 
to NOT_RUNNING, since this is not an error or exceptional case but rather a 
valid user action.

I also agree with something that [~vvcephei] suggested earlier, which is that 
this should be part of the KIP discussion. At the very least, we should raise 
the final proposal on the discussion thread in case there are any objections.

> Improve client state machine
> 
>
> Key: KAFKA-10555
> URL: https://issues.apache.org/jira/browse/KAFKA-10555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The KafkaStreams client exposes its state to the user for monitoring purpose 
> (ie, RUNNING, REBALANCING etc). The state of the client depends on the 
> state(s) of the internal StreamThreads that have their own states.
> Furthermore, the client state has impact on what the user can do with the 
> client. For example, active task can only be queried in RUNNING state and 
> similar.
> With KIP-671 and KIP-663 we improved error handling capabilities and allow to 
> add/remove stream thread dynamically. We allow adding/removing threads only 
> in RUNNING and REBALANCING state. This puts us in a "weird" position, because 
> if we enter ERROR state (ie, if the last thread dies), we cannot add new 
> threads and longer. However, if we have multiple threads and one dies, we 
> don't enter ERROR state and do allow to recover the thread.
> Before the KIPs the definition of ERROR state was clear, however, with both 
> KIPs it seem that we should revisit its semantics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10530) kafka-streams-application-reset misses some internal topics

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10530.
--
Resolution: Duplicate

Closing now, since this seems like a duplicate report, and visual code 
inspection indicates it should have been fixed.

If you do still see this [~oweiler] , please feel free to re-open the ticket.

> kafka-streams-application-reset misses some internal topics
> ---
>
> Key: KAFKA-10530
> URL: https://issues.apache.org/jira/browse/KAFKA-10530
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.6.0
>Reporter: Oliver Weiler
>Priority: Major
>
> While the \{{kafka-streams-application-reset}} tool works in most cases, it 
> misses some internal topics when using {{Foreign Key Table-Table Joins}}.
> After execution, there are still two internal topics left which were not 
> deleted
> {code}
> bv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic
> bbv4-indexer-717e6cc5-acb2-498d-9d08-4814aaa71c81-StreamThread-1-consumer 
> bbv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-14-topic
> {code}
> The reason seems to be the {{StreamsResetter.isInternalTopic}} which requires 
> the internal topic to end with {{-changelog}} or {{-repartition}} (which the 
> mentioned topics don't).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10562) KIP-478: Delegate the store wrappers to the new init method

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10562:
-
Summary: KIP-478: Delegate the store wrappers to the new init method  (was: 
Delegate the store wrappers to the new init method)

> KIP-478: Delegate the store wrappers to the new init method
> ---
>
> Key: KAFKA-10562
> URL: https://issues.apache.org/jira/browse/KAFKA-10562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10546) KIP-478: Deprecate old PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10546:
-
Component/s: streams

> KIP-478: Deprecate old PAPI
> ---
>
> Key: KAFKA-10546
> URL: https://issues.apache.org/jira/browse/KAFKA-10546
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Can't be done until after the DSL internals are migrated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10544) Convert KTable aggregations to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10544:
-
Component/s: streams

> Convert KTable aggregations to new PAPI
> ---
>
> Key: KAFKA-10544
> URL: https://issues.apache.org/jira/browse/KAFKA-10544
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10562) Delegate the store wrappers to the new init method

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10562:
-
Component/s: streams

> Delegate the store wrappers to the new init method
> --
>
> Key: KAFKA-10562
> URL: https://issues.apache.org/jira/browse/KAFKA-10562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10539) Convert KStreamImpl joins to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10539:
-
Component/s: streams

> Convert KStreamImpl joins to new PAPI
> -
>
> Key: KAFKA-10539
> URL: https://issues.apache.org/jira/browse/KAFKA-10539
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10538) Convert KStreamImpl maps to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10538:
-
Component/s: streams

> Convert KStreamImpl maps to new PAPI
> 
>
> Key: KAFKA-10538
> URL: https://issues.apache.org/jira/browse/KAFKA-10538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10540) Convert KStream aggregations to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10540:
-
Component/s: streams

> Convert KStream aggregations to new PAPI
> 
>
> Key: KAFKA-10540
> URL: https://issues.apache.org/jira/browse/KAFKA-10540
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10542) Convert KTable maps to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10542:
-
Component/s: streams

> Convert KTable maps to new PAPI
> ---
>
> Key: KAFKA-10542
> URL: https://issues.apache.org/jira/browse/KAFKA-10542
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10543) Convert KTable joins to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10543:
-
Component/s: streams

> Convert KTable joins to new PAPI
> 
>
> Key: KAFKA-10543
> URL: https://issues.apache.org/jira/browse/KAFKA-10543
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10541) Convert KTable filters to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10541:
-
Component/s: streams

> Convert KTable filters to new PAPI
> --
>
> Key: KAFKA-10541
> URL: https://issues.apache.org/jira/browse/KAFKA-10541
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10537) Convert KStreamImpl filters to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10537:
-
Component/s: streams

> Convert KStreamImpl filters to new PAPI
> ---
>
> Key: KAFKA-10537
> URL: https://issues.apache.org/jira/browse/KAFKA-10537
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10536) KIP-478: Implement KStream changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10536:
-
Fix Version/s: 2.7.0

> KIP-478: Implement KStream changes
> --
>
> Key: KAFKA-10536
> URL: https://issues.apache.org/jira/browse/KAFKA-10536
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10536) KIP-478: Implement KStream changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10536:
-
Component/s: streams

> KIP-478: Implement KStream changes
> --
>
> Key: KAFKA-10536
> URL: https://issues.apache.org/jira/browse/KAFKA-10536
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10535:
-
Fix Version/s: 2.7.0

> KIP-478: Implement StateStoreContext and Record
> ---
>
> Key: KAFKA-10535
> URL: https://issues.apache.org/jira/browse/KAFKA-10535
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10535.
--
Resolution: Fixed

> KIP-478: Implement StateStoreContext and Record
> ---
>
> Key: KAFKA-10535
> URL: https://issues.apache.org/jira/browse/KAFKA-10535
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Fix Version/s: 2.7.0

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10535:
-
Component/s: streams

> KIP-478: Implement StateStoreContext and Record
> ---
>
> Key: KAFKA-10535
> URL: https://issues.apache.org/jira/browse/KAFKA-10535
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Component/s: streams

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Component/s: (was: streams-test-utils)

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Component/s: streams-test-utils

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, streams-test-utils
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10436) Implement KIP-478 Topology changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10436:
-
Component/s: streams

> Implement KIP-478 Topology changes
> --
>
> Key: KAFKA-10436
> URL: https://issues.apache.org/jira/browse/KAFKA-10436
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


C0urante commented on pull request #9375:
URL: https://github.com/apache/kafka/pull/9375#issuecomment-703775022


   Thanks @kkonstantine, I've added a comment and addressed the Checkstyle 
issues.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


C0urante commented on a change in pull request #9375:
URL: https://github.com/apache/kafka/pull/9375#discussion_r499754411



##
File path: 
connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
##
@@ -73,6 +76,15 @@
 INT_LIST.add(-987654321);
 }
 
+@Test(timeout = 5000)
+public void shouldNotEncounterInfiniteLoop() {
+byte[] bytes = new byte[] { -17, -65,  -65 };

Review comment:
   👍   good call, will add 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


C0urante commented on a change in pull request #9375:
URL: https://github.com/apache/kafka/pull/9375#discussion_r499754411



##
File path: 
connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
##
@@ -73,6 +76,15 @@
 INT_LIST.add(-987654321);
 }
 
+@Test(timeout = 5000)
+public void shouldNotEncounterInfiniteLoop() {
+byte[] bytes = new byte[] { -17, -65,  -65 };

Review comment:
   👍  good call, will add





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


kkonstantine commented on a change in pull request #9375:
URL: https://github.com/apache/kafka/pull/9375#discussion_r499750739



##
File path: 
connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
##
@@ -73,6 +76,15 @@
 INT_LIST.add(-987654321);
 }
 
+@Test(timeout = 5000)
+public void shouldNotEncounterInfiniteLoop() {
+byte[] bytes = new byte[] { -17, -65,  -65 };

Review comment:
   we need a comment here to explain things. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-7334) Suggest changing config for state.dir in case of FileNotFoundException

2020-10-05 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-7334:
---
Labels: newbie  (was: )

> Suggest changing config for state.dir in case of FileNotFoundException
> --
>
> Key: KAFKA-7334
> URL: https://issues.apache.org/jira/browse/KAFKA-7334
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Priority: Major
>  Labels: newbie
>
> Quoting stack trace from KAFKA-5998 :
> {code}
> WARN [2018-08-22 03:17:03,745] 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager: task 
> [0_45] Failed to write offset checkpoint file to 
> /tmp/kafka-streams/
> {{ /0_45/.checkpoint: {}}}
> {{ ! java.nio.file.NoSuchFileException: 
> /tmp/kafka-streams//0_45/.checkpoint.tmp}}
> {{ ! at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}}
> {{ ! at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}}
> {code}
> When state.dir is left at default configuration, there is a chance that 
> certain files under the state directory are cleaned by OS since the default 
> dir starts with /tmp/kafka-streams.
> [~mjsax] and I proposed to suggest user, through exception message, to change 
> the location for state.dir .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation

2020-10-05 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208194#comment-17208194
 ] 

Sagar Rao commented on KAFKA-10559:
---

hey [~ableegoldman], I can pick this one if needed? Is there anything more that 
you would want to add apart ffrom the nicely worded description? 

> Don't shutdown the entire app upon TimeoutException during internal topic 
> validation
> 
>
> Key: KAFKA-10559
> URL: https://issues.apache.org/jira/browse/KAFKA-10559
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.7.0
>
>
> During some of the KIP-572 work, we made things pretty brittle by changing 
> the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` 
> error code and shut down the entire application if a TimeoutException is hit 
> during the internal topic creation/validation.
> Internal topic validation occurs during every rebalance, and we have seen it 
> time out on topic discovery in unstable environments. So shutting down the 
> entire application seems like a step in the wrong direction, and antithetical 
> to the goal of KIP-572 (improving the resiliency of Streams in the face of 
> TimeoutExceptions)
> I'm not totally sure what the previous behavior was, but it seems to me we 
> have three options:
>  # Rethrow the TimeoutException and allow it to kill the thread
>  # Swallow the TimeoutException and retry the rebalance indefinitely
>  # Some combination of the above: swallow the TimeoutException but don't 
> retry indefinitely:
>  ## Start a timer and allow retrying rebalances for up the configured 
> task.timeout.ms, the timeout config introduced in KIP-572
>  ## Retry for some constant number of rebalances
> I think if we go with option 3, then shutting down the entire application is 
> relatively more palatable, as we have given the environment a chance to 
> stabilize.
> But, killing the thread still seems preferable, given the two new features 
> that are coming out soon: the ability to start up new threads, and the 
> improved exception handler that allows the user to choose to shut down the 
> entire application if that's really what they want. Once users have this 
> level of control over the application, we should allow them to decide how 
> they want to handle exceptional cases like this, rather than forcing an 
> option on them (eg shutdown everything) 
>  
> Imo we should fix this before 2.7 comes out, even if it's just a partial fix 
> (eg we do option 1 in 2.7, but plan to implement option 3 eventually)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


lkokhreidze commented on a change in pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#discussion_r485886423



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
##
@@ -0,0 +1,242 @@
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+@Category({IntegrationTest.class})
+public class StreamTableJoinTopologyOptimizationIntegrationTest {

Review comment:
   There's already another `StreamTableIntegrationTest` present, but it 
works with `TopologyTestDriver` so I thought it would be better and easier to 
keep them separate.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >