[GitHub] [kafka] showuon closed pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed

2021-01-28 Thread GitBox


showuon closed pull request #9791:
URL: https://github.com/apache/kafka/pull/9791


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed

2021-01-28 Thread GitBox


showuon commented on pull request #9791:
URL: https://github.com/apache/kafka/pull/9791#issuecomment-769632410


   @kkonstantine , thanks for the comments. Good to me. Close this PR. Thank 
you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12246) Remove redundant suppression in KafkaAdminClient

2021-01-28 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-12246:
--

Assignee: YI-CHEN WANG

> Remove redundant suppression in KafkaAdminClient
> 
>
> Key: KAFKA-12246
> URL: https://issues.apache.org/jira/browse/KAFKA-12246
> Project: Kafka
>  Issue Type: Improvement
>Reporter: YI-CHEN WANG
>Assignee: YI-CHEN WANG
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-01-28 Thread GitBox


mjsax commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r566621619



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -227,6 +230,27 @@ public void initializeIfNeeded() {
 }
 }
 
+private void initOffsetsIfNeeded(final 
java.util.function.Consumer> offsetResetter) {
+final Map committed = 
mainConsumer.committed(resetOffsetsForPartitions);
+for (final Map.Entry committedEntry 
: committed.entrySet()) {
+final OffsetAndMetadata offsetAndMetadata = 
committedEntry.getValue();
+if (offsetAndMetadata != null) {
+mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata);
+resetOffsetsForPartitions.remove(committedEntry.getKey());
+}
+}
+
+if (!resetOffsetsForPartitions.isEmpty()) {

Review comment:
   this `if` is not strictly required, however, it allows us to just pass 
`null` as `offsetResetter` in tests, so might be worth it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-01-28 Thread GitBox


mjsax commented on a change in pull request #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r566621365



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -227,6 +230,27 @@ public void initializeIfNeeded() {
 }
 }
 
+private void initOffsetsIfNeeded(final 
java.util.function.Consumer> offsetResetter) {

Review comment:
   I was considering to maybe merge this method into `initMetadata()` but 
it might convolute different code path, and we should execute this method 
rarely anyway so I don't think we should have concerns about calling 
`mainConsumer.committed` twice for rare cases.
   
   Let me know what you think.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-01-28 Thread GitBox


chia7712 commented on pull request #1:
URL: https://github.com/apache/kafka/pull/1#issuecomment-769626443


   congratulations on PR 10,000 :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-01-28 Thread GitBox


tombentley commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-769621007


   Thanks, and no worries about the wait.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-01-28 Thread GitBox


guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-769618984


   Yes! I will review it again. Thanks for hanging on there and my apologies... 
Review has always been a bit overwhelming for me :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9268: KAFKA-10442; Add transaction admin APIs for KIP-664

2021-01-28 Thread GitBox


guozhangwang commented on pull request #9268:
URL: https://github.com/apache/kafka/pull/9268#issuecomment-769618616


   Sorry man...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2021-01-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274187#comment-17274187
 ] 

Guozhang Wang commented on KAFKA-10793:
---

cc [~hachikuji] [~ijuma] Hopefully we nailed it this time! :)

> Race condition in FindCoordinatorFuture permanently severs connection to 
> group coordinator
> --
>
> Key: KAFKA-10793
> URL: https://issues.apache.org/jira/browse/KAFKA-10793
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Affects Versions: 2.5.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Critical
> Fix For: 2.8.0, 2.7.1
>
>
> Pretty much as soon as we started actively monitoring the 
> _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we 
> started seeing something weird. Every so often one of the StreamThreads (ie a 
> single Consumer instance) would appear to permanently fall out of the group, 
> as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We 
> inject artificial network failures every few hours at most, so the group 
> rebalances quite often. But the one consumer never rejoins, with no other 
> symptoms (besides a slight drop in throughput since the remaining threads had 
> to take over this member's work). We're confident that the problem exists in 
> the client layer, since the logs confirmed that the unhealthy consumer was 
> still calling poll. It was also calling Consumer#committed in its main poll 
> loop, which was consistently failing with a TimeoutException.
> When I attached a remote debugger to an instance experiencing this issue, the 
> network client's connection to the group coordinator (the one that uses 
> MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But 
> for some reason it never tried to re-establish this connection, although it 
> did successfully connect to that same broker through the "normal" connection 
> (ie the one that juts uses node.id).
> The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed 
> (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null 
> so a new request is never sent. This shouldn't be possible since the 
> FindCoordinatorResponseHandler is supposed to clear the 
> _findCoordinatorFuture_ when the future is completed. But somehow that didn't 
> happen, so the consumer continues to assume there's still a FindCoordinator 
> request in flight and never even notices that it's dropped out of the group.
> These are the only confirmed findings so far, however we have some guesses 
> which I'll leave in the comments. Note that we only noticed this due to the 
> newly added _last-rebalance-seconds-ago_ __metric, and there's no reason to 
> believe this bug hasn't been flying under the radar since the Consumer's 
> inception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10001: MINOR: AbstractCoordinatorTest should close coordinator explicitly

2021-01-28 Thread GitBox


hachikuji commented on a change in pull request #10001:
URL: https://github.com/apache/kafka/pull/10001#discussion_r566613107



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -102,6 +104,11 @@
 private final String leaderId = "leaderId";
 private final int defaultGeneration = -1;
 
+@AfterEach
+public void closeCoordinator() {
+Utils.closeQuietly(coordinator, "close coordinator");

Review comment:
   While we're at it, shall we close `consumerClient`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

2021-01-28 Thread GitBox


guozhangwang commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-769616246


   Awesome!!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #10001: MINOR: AbstractCoordinatorTest should close coordinator explicitly

2021-01-28 Thread GitBox


chia7712 opened a new pull request #10001:
URL: https://github.com/apache/kafka/pull/10001


   I noticed this issue when digging into some flaky by JVM profiler. 
```AbstractCoordinatorTest``` does not close coordinator so it can cause a lot 
of idle heartbeat threads in the following tests.
   
   ```
   "kafka-coordinator-heartbeat-thread | dummy-group" #239 daemon prio=5 
os_prio=0 cpu=4.40ms elapsed=29.26s tid=0x7f4798c34000 nid=0x11b6 in 
Object.wait()  [0x7f471dbf5000]
  java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.9.1/Native Method)
- waiting on 
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398)
- waiting to re-lock in wait() <0x8152b250> (a 
org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator)
   
   "kafka-coordinator-heartbeat-thread | dummy-group" #240 daemon prio=5 
os_prio=0 cpu=4.15ms elapsed=29.16s tid=0x7f4798c36800 nid=0x11b7 in 
Object.wait()  [0x7f471d7f4000]
  java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.9.1/Native Method)
- waiting on 
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398)
- waiting to re-lock in wait() <0x8152e9c0> (a 
org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator)
   
   "kafka-coordinator-heartbeat-thread | dummy-group" #242 daemon prio=5 
os_prio=0 cpu=0.23ms elapsed=29.04s tid=0x7f4798c39000 nid=0x11b9 in 
Object.wait()  [0x7f471d3f3000]
  java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.9.1/Native Method)
- waiting on 
at java.lang.Object.wait(java.base@11.0.9.1/Object.java:328)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1355)
- waiting to re-lock in wait() <0x815107f8> (a 
org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator)
   
   "kafka-coordinator-heartbeat-thread | dummy-group" #244 daemon prio=5 
os_prio=0 cpu=3.62ms elapsed=29.03s tid=0x7f4798c3b000 nid=0x11bb in 
Object.wait()  [0x7f471cff2000]
  java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.9.1/Native Method)
- waiting on 
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398)
- waiting to re-lock in wait() <0x815330b0> (a 
org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator)
   
   "kafka-coordinator-heartbeat-thread | dummy-group" #245 daemon prio=5 
os_prio=0 cpu=4.09ms elapsed=28.93s tid=0x7f4798c3d800 nid=0x11bc in 
Object.wait()  [0x7f471cbf1000]
  java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.9.1/Native Method)
- waiting on 
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398)
- waiting to re-lock in wait() <0x815387e8> (a 
org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator)
   
   "kafka-coordinator-heartbeat-thread | dummy-group" #246 daemon prio=5 
os_prio=0 cpu=4.14ms elapsed=28.83s tid=0x7f4798c3f000 nid=0x11bd in 
Object.wait()  [0x7f471c7f]
  java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.9.1/Native Method)
- waiting on 
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398)
- waiting to re-lock in wait() <0x815389d8> (a 
org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator)
   
   "kafka-coordinator-heartbeat-thread | dummy-group" #247 daemon prio=5 
os_prio=0 cpu=4.08ms elapsed=28.72s tid=0x7f4798c41800 nid=0x11be in 
Object.wait()  [0x7f46e3ffe000]
  java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(java.base@11.0.9.1/Native Method)
- waiting on 
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1398)
- waiting to re-lock in wait() <0x81538bc8> (a 
org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator)
   ```
   
   I don't observe the relationship between this issue and flaky. However, it 
seems to me explicitly releasing idle resource is always a good pattern.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI bu

[jira] [Commented] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol

2021-01-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274181#comment-17274181
 ] 

Guozhang Wang commented on KAFKA-12169:
---

[~zoushengfu] by "restart with unknown member id", do you mean you bounced the 
client leader at the same time? Also, which broker version are you running with?

If you did bounced the leader and the brokers are on older versions (i.e. on 
older versions the broker would not trigger rebalance on non-leader joins with 
different metadata), there might indeed have a race condition here if we bounce 
the leader at the same time, such as:

T0: topic partitions metadata changes from 1000 to 2000, but have not been 
propagated to the consumer group leader.
T1: Leader is bounced, and then rejoined the group with a known instance.id, at 
that time its metadata is already at 2000 partitions, but the group coordinator 
would still give it the old assignment which only contains 1000 partitions.
T2: Since then the leader would not try to resend the join-group since its 
"join-group metadata snapshot" is the same as the refreshed metadata.

> Consumer can not know paritions change when client leader restart with static 
> membership protocol
> -
>
> Key: KAFKA-12169
> URL: https://issues.apache.org/jira/browse/KAFKA-12169
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.1, 2.6.1
>Reporter: zou shengfu
>Priority: Major
>
> Background: 
>  Kafka consumer services run with static membership and cooperative rebalance 
> protocol on kubernetes, and services often restart because of operation. When 
> we added partitions from 1000 to 2000 for the topic, client leader restart 
> with unknown member id at the same time, we found  the consumers do not 
> tigger rebalance and still consume 1000 paritions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax opened a new pull request #10000: KAFKA-9274: handle TimeoutException on task reset

2021-01-28 Thread GitBox


mjsax opened a new pull request #1:
URL: https://github.com/apache/kafka/pull/1


- part of KIP-572



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9999: MINOR: Ensure `InterBrokerSendThread` closes `NetworkClient`

2021-01-28 Thread GitBox


hachikuji opened a new pull request #:
URL: https://github.com/apache/kafka/pull/


   We should ensure `NetworkClient` is closed properly when 
`InterBrokerSendThread` is shutdown. Also use `initiateShutdown` instead of 
`wakeup()` to alert polling thread.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12169) Consumer can not know paritions change when client leader restart with static membership protocol

2021-01-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12169:
--
Summary: Consumer can not know paritions change when client leader restart 
with static membership protocol  (was: Consumer can not know paritions chage 
when client leader restart with static membership protocol)

> Consumer can not know paritions change when client leader restart with static 
> membership protocol
> -
>
> Key: KAFKA-12169
> URL: https://issues.apache.org/jira/browse/KAFKA-12169
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.1, 2.6.1
>Reporter: zou shengfu
>Priority: Major
>
> Background: 
>  Kafka consumer services run with static membership and cooperative rebalance 
> protocol on kubernetes, and services often restart because of operation. When 
> we added partitions from 1000 to 2000 for the topic, client leader restart 
> with unknown member id at the same time, we found  the consumers do not 
> tigger rebalance and still consume 1000 paritions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed

2021-01-28 Thread GitBox


kkonstantine commented on pull request #9791:
URL: https://github.com/apache/kafka/pull/9791#issuecomment-769611695


   The former I guess. I don't see a strong case on skipping the messages based 
on number of occurrences. Seems we can use this information in the log and that 
it should be present in abnormal circumstances (maybe with a severity that 
varies)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9969: MINOR: updated upgrade and architecture for KIP-663, KIP-696, and KIP-671

2021-01-28 Thread GitBox


guozhangwang commented on pull request #9969:
URL: https://github.com/apache/kafka/pull/9969#issuecomment-769608173


   LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie commented on pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-28 Thread GitBox


g1geordie commented on pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#issuecomment-769603195


   @chia7712  
   Thank for your patch .
   I follow the style and change `MemoryRecordsBuilderTest`
   
   Can you help me take a look? :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-28 Thread GitBox


guozhangwang commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r566597020



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
##
@@ -41,8 +43,19 @@ object Serdes {
   implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
   implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
 
-  implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.TimeWindowedSerde[T] =
-new WindowedSerdes.TimeWindowedSerde[T](tSerde)
+  implicit def timeWindowedSerde[T](implicit inner: Serde[T]): 
Serde[Windowed[T]] =
+new JSerdes.WrapperSerde[Windowed[T]](
+  new TimeWindowedSerializer[T](inner.serializer),
+  new TimeWindowedDeserializer[T](inner.deserializer) {
+override def deserialize(topic: String, data: Array[Byte]): 
Windowed[T] = {

Review comment:
   Thinking about this once again: if users did used the 
`timeWindowedSerde`, then we would log a WARN on each record, which would soon 
flood the log files. So instead of log at the `deserialize` function, maybe 
it's better to just log once at the `timeWindowedSerde` itself?

##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala
##
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender
+import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde
+import org.junit.Assert.assertFalse
+import org.junit.Test
+
+class SerdesUnitTest {
+
+  @Test
+  def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = {
+
+Serdes.timeWindowedSerde(new TimeWindowedSerde[String]())
+val appender = LogCaptureAppender.createAndRegister()
+val warning = appender.getMessages()
+assertFalse("There should be a warning about TimeWindowedDeserializer", 
warning.isEmpty)

Review comment:
   Personally I'm not a big fan to add tests coverage for deprecated 
functions to make sure the expected warning is generated :) Also, is this test 
correct? Since currently the `Serdes.timeWindowedSerde` itself would not log 
any warning, and we would only log when the `deserialize` is called right (see 
my other comment)? 
   
   Anyways, I'm actually okay if you would like to just remove this test. But 
on the other hand we should try to avoid a WARN at a per-record granularity too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9998: KAFKA-12250; Add metadata record serde for KIP-631

2021-01-28 Thread GitBox


hachikuji opened a new pull request #9998:
URL: https://github.com/apache/kafka/pull/9998


   This patch adds a `RecordSerde` implementation for the metadata record 
format expected by KIP-631. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12250) Add metadata record serde logic for KIP-631

2021-01-28 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12250:
---

 Summary: Add metadata record serde logic for KIP-631
 Key: KAFKA-12250
 URL: https://issues.apache.org/jira/browse/KAFKA-12250
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


KIP-631 specifies the schema for records written to the log. We need to write 
an instance of `RecordSerd` for this format.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-28 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax do you recommend adding method to AbstractProcessorContext to set 
cachedSystemTimeMs (setCachedSystemTimeMs) or you want me to add 
setSystemTimeMs back to 
[InternalProcessorContext](https://github.com/apache/kafka/pull/9744/files#diff-34daeb287c7e79c8ccd757daa4e17d6ab585d54844f6e5e8676853762a08cedcL49)
 and set the system time like it was done before? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-28 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax do you recommend adding method to AbstractProcessorContext to set 
cachedSystemTimeMs (setCachedSystemTimeMs) or you want me to add 
setSystemTimeMs back to 
[InternalProcessorContext](https://github.com/apache/kafka/pull/9744/files#diff-34daeb287c7e79c8ccd757daa4e17d6ab585d54844f6e5e8676853762a08cedcL49)?
 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #9744: KAFKA-10062: Add a method to retrieve the current timestamp as known by the Streams app

2021-01-28 Thread GitBox


rohitrmd commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r566587951



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,7 +45,6 @@
 private boolean initialized;
 protected ProcessorRecordContext recordContext;
 protected ProcessorNode currentNode;
-private long currentSystemTimeMs;

Review comment:
   @mjsax do you recommend adding method to AbstractProcessorContext to set 
cachedSystemTimeMs (setCachedSystemTimeMs)? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500

2021-01-28 Thread GitBox


hachikuji commented on a change in pull request #9967:
URL: https://github.com/apache/kafka/pull/9967#discussion_r566587032



##
File path: core/src/main/scala/kafka/server/Server.scala
##
@@ -46,6 +46,22 @@ object Server {
 new Metrics(metricConfig, reporters, time, true, metricsContext)
   }
 
+  def initializeMetrics(
+config: KafkaConfig,
+time: Time,
+metaProps: MetaProperties

Review comment:
   @chia7712 We need the clusterId from `MetaProperties`. I modified this 
to pass it through directly avoid the duplicate methods.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`

2021-01-28 Thread GitBox


mjsax commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r566586589



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
 if (recordInfo.queue().size() == maxBufferedSize) {
 mainConsumer.resume(singleton(partition));
 }
-} catch (final StreamsException e) {
-throw e;
+
+record = null;
+} catch (final TimeoutException timeoutException) {
+if (!eosEnabled) {
+throw timeoutException;
+} else {
+record = null;
+throw new TaskCorruptedException(Collections.singletonMap(id, 
changelogPartitions()));

Review comment:
   Note that we don't trigger `task.timeout.ms` for this case atm. Because 
we need to restore state what might talk some time, it seems questionable if we 
should tigger `task.timeout.ms` for this case of not.
   
   Cf. TaskManager#process() that catches `TimeoutException` and trigger the 
timeout, but does not catch `TaskCorruptedException`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`

2021-01-28 Thread GitBox


mjsax commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r566580878



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
 if (recordInfo.queue().size() == maxBufferedSize) {
 mainConsumer.resume(singleton(partition));
 }
-} catch (final StreamsException e) {
-throw e;
+
+record = null;
+} catch (final TimeoutException timeoutException) {
+if (!eosEnabled) {
+throw timeoutException;
+} else {
+record = null;
+throw new TaskCorruptedException(Collections.singletonMap(id, 
changelogPartitions()));

Review comment:
   For this case, we don't need the cached record, as we need to reset the 
task anyway to cleanup potentially "corrupted" state store.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`

2021-01-28 Thread GitBox


mjsax commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r566580717



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -663,18 +666,21 @@ public boolean isProcessable(final long wallClockTime) {
  */
 @SuppressWarnings("unchecked")
 public boolean process(final long wallClockTime) {
-if (!isProcessable(wallClockTime)) {
-return false;
-}
+if (record == null) {

Review comment:
   If we have a cached record, it implies we failed to process it before -- 
thus we don't pull a new record from the buffer but retry to process the cached 
record.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`

2021-01-28 Thread GitBox


mjsax commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r566580568



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -99,6 +100,7 @@
 private final InternalProcessorContext processorContext;
 private final RecordQueueCreator recordQueueCreator;
 
+private StampedRecord record;

Review comment:
   We pull this variable out form the method to use it as a "cache" -- if 
we fail on send(), we can process the record a second time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #9997: KAFKA-9274: Add timeout handling for `StreamPartitioner`

2021-01-28 Thread GitBox


mjsax opened a new pull request #9997:
URL: https://github.com/apache/kafka/pull/9997


- part of KIP-572
   
   When a custom `StreamPartitioner` is used, we need to get the number of 
partitions of output topics from the producer.
   This `partitionFor(topic)` call may through a `TimeoutException` that we now 
handle gracefully.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566576655



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {

Review comment:
   Oh my god. I hate json lol





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566559281



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {

Review comment:
   I'll try it out with json. If we do use json, then we don't even need 
the version number right? As long as we only ever add fields then it should 
always be compatible. At least that's my understanding, I'm not a json expert





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 commented on pull request #9991: MINOR: Reorder the modifiers and Replace Map.get with Map.computeIfAbsent

2021-01-28 Thread GitBox


tang7526 commented on pull request #9991:
URL: https://github.com/apache/kafka/pull/9991#issuecomment-769547448


   > @tang7526 Could you merge trunk to trigger QA again?
   
   Done



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9991: MINOR: Reorder the modifiers and Replace Map.get with Map.computeIfAbsent

2021-01-28 Thread GitBox


chia7712 commented on pull request #9991:
URL: https://github.com/apache/kafka/pull/9991#issuecomment-769546117


   @tang7526 Could you merge trunk to trigger QA again?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-28 Thread GitBox


dengziming commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r566550612



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -1879,6 +1879,109 @@ class KafkaApisTest {
 
assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty)
   }
 
+  @Test
+  def testUnauthorizedTopicMetadataRequest(): Unit = {
+
+// 1. Set up broker information
+val plaintextListener = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+val broker = new UpdateMetadataBroker()
+  .setId(0)
+  .setRack("rack")
+  .setEndpoints(Seq(
+new UpdateMetadataEndpoint()
+  .setHost("broker0")
+  .setPort(9092)
+  .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+  .setListener(plaintextListener.value)
+  ).asJava)
+
+// 2. Set up authorizer
+val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+val unauthorizedTopic = "unauthorized-topic"
+val authorizedTopic = "authorized-topic"
+
+val expectedActions = Seq(
+  new Action(AclOperation.DESCRIBE, new 
ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, 
true, true),
+  new Action(AclOperation.DESCRIBE, new 
ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, 
true, true)
+)
+
+val expectedAuthorizeResult = Seq(AuthorizationResult.DENIED, 
AuthorizationResult.ALLOWED).asJava
+
+EasyMock.expect(authorizer.authorize(anyObject[RequestContext], 
EasyMock.eq(expectedActions.asJava)))
+  .andReturn(expectedAuthorizeResult)
+  .times(2)
+
+// 3. Set up MetadataCache
+val authorizedTopicId = Uuid.randomUuid();
+val unauthorizedTopicId = Uuid.randomUuid();
+
+val topicIds = new util.HashMap[String, Uuid]()
+topicIds.put(authorizedTopic, authorizedTopicId)
+topicIds.put(unauthorizedTopic, unauthorizedTopicId)
+
+def createDummyPartitionStates(topic: String) = {
+  new UpdateMetadataPartitionState()
+.setTopicName(topic)
+.setPartitionIndex(0)
+.setControllerEpoch(1)
+.setLeader(0)
+.setLeaderEpoch(1)
+.setReplicas(Collections.singletonList(0))
+.setZkVersion(0)
+.setIsr(Collections.singletonList(0))
+}
+
+// Send UpdateMetadataReq to update MetadataCache
+val partitionStates = Seq(unauthorizedTopic, 
authorizedTopic).map(createDummyPartitionStates)
+
+val updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+  0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build()
+metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
+
+// 4. Send TopicMetadataReq using topicId
+val capturedMetadataByTopicIdResp = expectNoThrottling()
+EasyMock.replay(clientRequestQuotaManager, requestChannel, authorizer)
+
+val metadataReqByTopicId = new 
MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, 
unauthorizedTopicId)).build()
+createKafkaApis(authorizer = 
Some(authorizer)).handleTopicMetadataRequest(buildRequest(metadataReqByTopicId, 
plaintextListener))
+val metadataByTopicIdResp = readResponse(metadataReqByTopicId, 
capturedMetadataByTopicIdResp).asInstanceOf[MetadataResponse]
+
+val metadataByTopicId = 
metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).view.mapValues(_.head).toMap

Review comment:
   Thank you, Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566550049



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {
+if (!hasPersistentStores) {
+return UUID.randomUUID();
+}
+
+if (!lockStateDirectory()) {
+log.error("Unable to obtain lock as state directory is already 
locked by another process");
+throw new StreamsException("Unable to initialize state, this can 
happen if multiple instances of " +
+   "Kafka Streams are running in the 
same state directory");
+}
+
+final File processFile = new File(stateDir, PROCESS_FILE_NAME);
+try {
+if (processFile.exists()) {
+try (final BufferedReader reader = 
Files.newBufferedReader(processFile.toPath())) {
+// only field in version 0 is the UUID
+final int version = Integer.parseInt(reader.readLine());
+if (version > 0) {

Review comment:
   Oh yeah definitely, thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566549210



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {
-if (state == State.ERROR) {
-log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+if (state == State.ERROR || state == State.NOT_RUNNING) {
+log.info("Streams client is already in the terminal {} state, all 
resources are closed and the client has stopped.", state);
 return true;
 }
-if (state == State.PENDING_ERROR) {
-log.info("Streams client is in PENDING_ERROR, all resources are 
being closed and the client will be stopped.");
-if (waitOnState(State.ERROR, timeoutMs)) {
+if (state == State.PENDING_ERROR || state == State.PENDING_SHUTDOWN) {
+log.info("Streams client is in {}, all resources are being closed 
and the client will be stopped.", state);
+if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
 log.info("Streams client stopped to ERROR completely");
 return true;
+} else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+log.info("Streams client stopped to NOT_RUNNING completely");
+return true;
 } else {
-log.info("Streams client cannot transition to ERROR completely 
within the timeout");
+log.warn("Streams client cannot transition to {}} completely 
within the timeout", state);

Review comment:
   Ah good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-28 Thread GitBox


ijuma commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r566540886



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
##
@@ -256,6 +257,80 @@ public void testWorkerConfigs() {
 "secret2", bProps.get("producer.ssl.key.password"));
 }
 
+@Test
+public void testClusterPairsWithDefaultSettings() {
+MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+"clusters", "a, b, c"));
+// implicit configuration associated
+// a->b.enabled=false
+// a->b.emit.heartbeat.enabled=true
+// a->c.enabled=false
+// a->c.emit.heartbeat.enabled=true
+// b->a.enabled=false
+// b->a.emit.heartbeat.enabled=true
+// b->c.enabled=false
+// b->c.emit.heartbeat.enabled=true
+// c->a.enabled=false
+// c->a.emit.heartbeat.enabled=true
+// c->b.enabled=false
+// c->b.emit.heartbeat.enabled=true
+List clusterPairs = mirrorConfig.clusterPairs();
+assertEquals("clusterPairs count should match all combinations count",

Review comment:
   I fixed the build via 
https://github.com/apache/kafka/commit/bf4afae8f53471ab6403cbbfcd2c4e427bdd4568





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil opened a new pull request #9996: KAFKA-12249: Add client-side Decommission Broker RPC

2021-01-28 Thread GitBox


aloknnikhil opened a new pull request #9996:
URL: https://github.com/apache/kafka/pull/9996


   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r566536740



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
##
@@ -256,6 +257,80 @@ public void testWorkerConfigs() {
 "secret2", bProps.get("producer.ssl.key.password"));
 }
 
+@Test
+public void testClusterPairsWithDefaultSettings() {
+MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+"clusters", "a, b, c"));
+// implicit configuration associated
+// a->b.enabled=false
+// a->b.emit.heartbeat.enabled=true
+// a->c.enabled=false
+// a->c.emit.heartbeat.enabled=true
+// b->a.enabled=false
+// b->a.emit.heartbeat.enabled=true
+// b->c.enabled=false
+// b->c.emit.heartbeat.enabled=true
+// c->a.enabled=false
+// c->a.emit.heartbeat.enabled=true
+// c->b.enabled=false
+// c->b.emit.heartbeat.enabled=true
+List clusterPairs = mirrorConfig.clusterPairs();
+assertEquals("clusterPairs count should match all combinations count",

Review comment:
   @twobeeb Could you migrate these to use the new JUnit 5 `assertEquals` 
API? Looks like the build actually failed here - 
https://github.com/apache/kafka/runs/1782294502





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566532815



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private BaseH

[jira] [Commented] (KAFKA-10853) Replication protocol deficiencies with workloads requiring high durability guarantees

2021-01-28 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274114#comment-17274114
 ] 

Kyle Ambroff-Kao commented on KAFKA-10853:
--

I've been discussing this with my team at LinkedIn and so far we don't see a 
better alternative than the largestAckedOffset + require acks=all in a topic 
level config. Your suggestions have been really helpful but I think our 
original approach still sounds better.

We might just prototype and test this out in our Kafka fork.

> Replication protocol deficiencies with workloads requiring high durability 
> guarantees
> -
>
> Key: KAFKA-10853
> URL: https://issues.apache.org/jira/browse/KAFKA-10853
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Kyle Ambroff-Kao
>Priority: Major
>
> *tl;dr: The definition of ISR and the consistency model from the perspective 
> of the producer seem a bit out of sync*
> We have many systems in production that trade off availability in order to 
> provide stronger consistency guarantees. Most of these configurations look 
> like this:
> Topic configuration:
>  * replication factor 3
>  * min.insync.replicas=2
>  * unclean.leader.election.enable=false
> Producer configuration:
>  * acks=all
> Broker configuration:
>  * replica.lag.time.max.ms=1
> So the goal here is to reduce the chance of ever dropping a message that the 
> leader has acknowledged to the producer.
> This works great, except that we've found some situations in production where 
> we are forced to enable unclean leader election to recover, which we never 
> want to do. These situations all seem totally avoidable with some small 
> tweaks to the replication protocol.
> *A scenario we've seen many times*
> The following sequence of events are in time order: A replica set for a 
> topic-partition TP with leader L and replicas R1 and R2. All three replicas 
> are in ISR.
>  # Producer sends ProduceRequest R with acks=all that contains a message 
> batch to the leader L.
>  # L receives R and appends the batch it contains to the active segment of TP 
> but does not ack to the producer yet because the request was acks=all
>  # A storage fault occurs on L which makes all IOPS take a long time but 
> doesn't cause a hard failure.
>  # R1 and R2 send follower fetch requests to L which are infinitely delayed 
> due to the storage fault on L.
>  # 10 seconds after appending the batch and appending it to the log, L 
> shrinks the ISR, removing R1 and R2. This is because ISR is defined as at 
> most replica.lag.time.max.ms milliseconds behind the log append time of the 
> leader end offset. The leader end offset is a message that has not been 
> replicated yet.
> The storage fault example in step 3 could easily be another kind of fault. 
> Say for example, L is partitioned from R1 and R2 but not from ZooKeeper or 
> the producer.
> The producer never receives acknowledgement of the ProduceRequest because the 
> min.insync.replicas constraint was never satisfied. So in terms of data 
> consistency, everything is working fine.
> The problem is recovering from this situation. If the fault on L is not a 
> temporary blip, then L needs to be replaced. But since L shrunk the ISR, the 
> only way that leadership can move to either R1 or R2 is to set 
> unclean.leader.election.enable=true.
> This works but it is a potentially unsafe way to recover and move leadership. 
> It would be better to have other options.
> *Recovery could be automatic in this scenario.*
> If you think about it, from the perspective of the producer, the write was 
> not acknowledged, and therefore, L, R1 and R2 are actually in-sync. So it 
> should actually be totally safe for leadership to transition to either R1 or 
> R2.
> It seems that the producer and the leader don't have fully compatible 
> definitions for what it means for the replica set to be in-sync. If the 
> leader L used different rules for defining ISR, it could allow self-healing 
> in this or similar scenarios, since the ISR would not shrink.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566532467



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private BaseH

[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566532372



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @paramThe key type of the set.
+ * @paramThe value type of the set.
+ */
+public class TimelineHashMap
+extends SnapshottableHashTable>
+implements Map {
+static class TimelineHashMapEntry
+implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry {
+private final K key;
+private final V value;
+private long startEpoch;
+
+TimelineHashMapEntry(K key, V value) {
+this.key = key;
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+@Override
+public K getKey() {
+return key;
+}
+
+@Override
+public V getValue() {
+return value;
+}
+
+@Override
+public V setValue(V value) {
+// This would be inefficient to support since we'd need a 
back-reference
+// to the enclosing map in each Entry object.  There would also be
+// complications if this entry object was sourced from a 
historical iterator;
+// we don't support modifying the past.  Since we don't really 
need this API,
+// let's just not support it.
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashMapEntry)) return false;
+TimelineHashMapEntry other = (TimelineHashMapEntry) o;
+return key.equals(other.key);
+}
+
+@Override
+public int hashCode() {
+return key.hashCode();
+}
+}
+
+public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean containsKey(Object key) {
+return containsKey(key, Long.MAX_VALUE);
+}
+
+public boolean containsKey(Object key, long epoch) {
+return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+}
+
+@Override
+public boolean containsValue(Object value) {
+Iterator> iter = entrySet().iterator();
+while (iter.hasNext()) {
+Entry e = iter.next();
+if (value.equals(e.getValue())) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public V get(Object key) {
+return get(key, Long.MAX_VALUE);
+}
+
+public V get(Object key, long epoch) {
+Entry entry =
+snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+if (entry == null) {
+return null;
+}
+return entry.getValue();
+}
+
+@Override
+public V put(K key, V value) {
+Objects.requireNonNull(key);
+

[jira] [Created] (KAFKA-12249) KIP-500: Add client-side Decommission Broker RPC

2021-01-28 Thread Alok Nikhil (Jira)
Alok Nikhil created KAFKA-12249:
---

 Summary: KIP-500: Add client-side Decommission Broker RPC
 Key: KAFKA-12249
 URL: https://issues.apache.org/jira/browse/KAFKA-12249
 Project: Kafka
  Issue Type: Task
  Components: clients, core
Affects Versions: 2.8.0
Reporter: Alok Nikhil
Assignee: Alok Nikhil






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] aloknnikhil commented on pull request #9994: KAFKA-12248: Add BrokerHeartbeat/BrokerRegistration RPCs for KIP-500

2021-01-28 Thread GitBox


aloknnikhil commented on pull request #9994:
URL: https://github.com/apache/kafka/pull/9994#issuecomment-769516430


   @hachikuji Fixed it up to merge only the Broker-Controller RPCs 
(Heartbeat/Registration)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12248) KIP-500: Add broker heartbeat and registration RPCs

2021-01-28 Thread Alok Nikhil (Jira)
Alok Nikhil created KAFKA-12248:
---

 Summary: KIP-500: Add broker heartbeat and registration RPCs
 Key: KAFKA-12248
 URL: https://issues.apache.org/jira/browse/KAFKA-12248
 Project: Kafka
  Issue Type: Task
  Components: core
Affects Versions: 2.8.0
Reporter: Alok Nikhil
Assignee: Alok Nikhil


For KIP-500, we need to support broker heartbeats and registration with the 
Quorum Controller (as described in KIP-631). This task tracks the addition of 
the RPC message types to support these.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7263) Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null

2021-01-28 Thread Jot Zhao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274089#comment-17274089
 ] 

Jot Zhao edited comment on KAFKA-7263 at 1/29/21, 1:32 AM:
---

[~soodvarun25] this issues  has fixed by issue kafka-8104 and PR is 
[here|https://github.com/apache/kafka/pull/7460]


was (Author: jot.zhao):
[~soodvarun25] this issues  has fixed by issue kafka-8104 and PR is 
[this|https://github.com/apache/kafka/pull/7460]

> Container exception java.lang.IllegalStateException: Coordinator selected 
> invalid assignment protocol: null
> ---
>
> Key: KAFKA-7263
> URL: https://issues.apache.org/jira/browse/KAFKA-7263
> Project: Kafka
>  Issue Type: Bug
>Reporter: laomei
>Priority: Major
> Fix For: 2.4.0
>
>
> We are using  spring-kafka and we get an infinite loop error in 
> ConsumerCoordinator.java;
> kafka cluster version: 1.0.0
> kafka-client version: 1.0.0
>  
> 2018-08-08 15:24:46,120 ERROR 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Container exception
>  java.lang.IllegalStateException: Coordinator selected invalid assignment 
> protocol: null
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
>   at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:745)
>  2018-08-08 15:24:46,132 INFO 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Consumer stopped
>  2018-08-08 15:24:46,230 INFO 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Consumer stopped
>  2018-08-08 15:24:46,234 INFO [org.springfram



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7263) Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null

2021-01-28 Thread Jot Zhao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274089#comment-17274089
 ] 

Jot Zhao edited comment on KAFKA-7263 at 1/29/21, 1:32 AM:
---

[~soodvarun25] this issues  has fixed by issue kafka-8104 and PR is 
[this|https://github.com/apache/kafka/pull/7460]


was (Author: jot.zhao):
[~soodvarun25] this issues  has fixed by issue 
[kafka-8104|https://issues.apache.org/jira/browse/KAFKA-8104] and PR is [ 
this|https://github.com/apache/kafka/pull/7460]

> Container exception java.lang.IllegalStateException: Coordinator selected 
> invalid assignment protocol: null
> ---
>
> Key: KAFKA-7263
> URL: https://issues.apache.org/jira/browse/KAFKA-7263
> Project: Kafka
>  Issue Type: Bug
>Reporter: laomei
>Priority: Major
> Fix For: 2.4.0
>
>
> We are using  spring-kafka and we get an infinite loop error in 
> ConsumerCoordinator.java;
> kafka cluster version: 1.0.0
> kafka-client version: 1.0.0
>  
> 2018-08-08 15:24:46,120 ERROR 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Container exception
>  java.lang.IllegalStateException: Coordinator selected invalid assignment 
> protocol: null
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
>   at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:745)
>  2018-08-08 15:24:46,132 INFO 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Consumer stopped
>  2018-08-08 15:24:46,230 INFO 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Consumer stopped
>  2018-08-08 15:24:46,234 INFO [org.springfram



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7263) Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null

2021-01-28 Thread Jot Zhao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274089#comment-17274089
 ] 

Jot Zhao commented on KAFKA-7263:
-

[~soodvarun25] this issues  has fixed by issue 
[kafka-8104|https://issues.apache.org/jira/browse/KAFKA-8104] and PR is [ 
this|https://github.com/apache/kafka/pull/7460]

> Container exception java.lang.IllegalStateException: Coordinator selected 
> invalid assignment protocol: null
> ---
>
> Key: KAFKA-7263
> URL: https://issues.apache.org/jira/browse/KAFKA-7263
> Project: Kafka
>  Issue Type: Bug
>Reporter: laomei
>Priority: Major
> Fix For: 2.4.0
>
>
> We are using  spring-kafka and we get an infinite loop error in 
> ConsumerCoordinator.java;
> kafka cluster version: 1.0.0
> kafka-client version: 1.0.0
>  
> 2018-08-08 15:24:46,120 ERROR 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Container exception
>  java.lang.IllegalStateException: Coordinator selected invalid assignment 
> protocol: null
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
>   at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:745)
>  2018-08-08 15:24:46,132 INFO 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Consumer stopped
>  2018-08-08 15:24:46,230 INFO 
> [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer]
>  - Consumer stopped
>  2018-08-08 15:24:46,234 INFO [org.springfram



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


hachikuji merged pull request #9985:
URL: https://github.com/apache/kafka/pull/9985


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sudoa commented on pull request #9995: Add log about broker info when getting UNKNOWN_TOPIC_OR_PARTITION error

2021-01-28 Thread GitBox


sudoa commented on pull request #9995:
URL: https://github.com/apache/kafka/pull/9995#issuecomment-769495037


   done by mistake



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sudoa closed pull request #9995: Add log about broker info when getting UNKNOWN_TOPIC_OR_PARTITION error

2021-01-28 Thread GitBox


sudoa closed pull request #9995:
URL: https://github.com/apache/kafka/pull/9995


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sudoa opened a new pull request #9995: Add log about broker info when getting UNKNOWN_TOPIC_OR_PARTITION error

2021-01-28 Thread GitBox


sudoa opened a new pull request #9995:
URL: https://github.com/apache/kafka/pull/9995


   Description: This patch adds additional logs in the `NetworkClient` to print 
metadata info such as leader id and epoch for partitions that fail metadata 
fetch with UNKNOWN_TOPIC_OR_PARTITION error. This will help with a Venice 
investigation: https://jira01.corp.linkedin.com:8443/browse/LIKAFKA-33540
   Testing: N/A, since it's for logs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9994: MINOR: Add BrokerHeartbeat/BrokerRegistration/DecommissionBroker RPCs for KIP-500

2021-01-28 Thread GitBox


hachikuji commented on pull request #9994:
URL: https://github.com/apache/kafka/pull/9994#issuecomment-769479261


   Thanks for the PR. Just to make this a little more manageable, could we 
split this into two PRs? One for the register/heartbeat controller APIs? And a 
separate one for decommission, which is a client API. 
   
   Also, it would be nice to have JIRAs since these are significant additions. 
Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil opened a new pull request #9994: MINOR: Add BrokerHeartbeat/BrokerRegistration/DecommissionBroker RPCs for KIP-500

2021-01-28 Thread GitBox


aloknnikhil opened a new pull request #9994:
URL: https://github.com/apache/kafka/pull/9994


   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-28 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r566487288



##
File path: 
core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
##
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package kafka.server
+package unit.kafka.server

Review comment:
   Got it. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9984: KAFKA-12247: add timeout and static group rebalance to remove thread

2021-01-28 Thread GitBox


ableegoldman commented on pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#issuecomment-769472481


   FYI
   ```
   15:42:08  Execution failed for task ':streams:checkstyleMain'.
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566481305



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashSet.java
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash set which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null values are not 
supported.
+ *
+ * @paramThe value type of the set.
+ */
+public class TimelineHashSet
+extends SnapshottableHashTable>
+implements Set {
+static class TimelineHashSetEntry
+implements SnapshottableHashTable.ElementWithStartEpoch {
+private final T value;
+private long startEpoch;
+
+TimelineHashSetEntry(T value) {
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+public T getValue() {
+return value;
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashSetEntry)) return false;
+TimelineHashSetEntry other = (TimelineHashSetEntry) o;
+return value.equals(other.value);
+}
+
+@Override
+public int hashCode() {
+return value.hashCode();
+}
+}
+
+public TimelineHashSet(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean contains(Object key) {
+return contains(key, Long.MAX_VALUE);
+}
+
+public boolean contains(Object object, long epoch) {
+return snapshottableGet(new TimelineHashSetEntry<>(object), epoch) != 
null;
+}
+
+final class ValueIterator implements Iterator {
+private final Iterator> iter;
+
+ValueIterator(long epoch) {
+this.iter = snapshottableIterator(epoch);
+}
+
+@Override
+public boolean hasNext() {
+return iter.hasNext();
+}
+
+@Override
+public T next() {
+return iter.next().value;
+}
+
+@Override
+public void remove() {
+iter.remove();
+}
+}
+
+@Override
+public Iterator iterator() {
+return iterator(Long.MAX_VALUE);
+}
+
+public Iterator iterator(long epoch) {
+return new ValueIterator(epoch);
+}
+
+@Override
+public Object[] toArray() {
+Object[] result = new Object[size()];
+Iterator iter = iterator();
+int i = 0;
+while (iter.hasNext()) {
+result[i++] = iter.next();
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public  R[] toArray(R[] a) {
+int size = size();
+if (size <= a.length) {
+Iterator iter = iterator();
+int i = 0;
+while (iter.hasNext()) {
+a[i++] = (R) iter.next();
+}
+while (i < a.length) {
+a[i++] = null;
+}
+return a;
+} else {
+return (R[]) toArray();
+}
+}
+
+@Override
+public boolean add(T newValue) {
+Objects.requireNonNull(newValue);
+return snapshottableAddUnlessPresent(new 
TimelineHa

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566481190



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @paramThe key type of the set.
+ * @paramThe value type of the set.
+ */
+public class TimelineHashMap
+extends SnapshottableHashTable>
+implements Map {
+static class TimelineHashMapEntry
+implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry {
+private final K key;
+private final V value;
+private long startEpoch;
+
+TimelineHashMapEntry(K key, V value) {
+this.key = key;
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+@Override
+public K getKey() {
+return key;
+}
+
+@Override
+public V getValue() {
+return value;
+}
+
+@Override
+public V setValue(V value) {
+// This would be inefficient to support since we'd need a 
back-reference
+// to the enclosing map in each Entry object.  There would also be
+// complications if this entry object was sourced from a 
historical iterator;
+// we don't support modifying the past.  Since we don't really 
need this API,
+// let's just not support it.
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashMapEntry)) return false;
+TimelineHashMapEntry other = (TimelineHashMapEntry) o;
+return key.equals(other.key);
+}
+
+@Override
+public int hashCode() {
+return key.hashCode();
+}
+}
+
+public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean containsKey(Object key) {
+return containsKey(key, Long.MAX_VALUE);
+}
+
+public boolean containsKey(Object key, long epoch) {
+return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+}
+
+@Override
+public boolean containsValue(Object value) {
+Iterator> iter = entrySet().iterator();
+while (iter.hasNext()) {
+Entry e = iter.next();
+if (value.equals(e.getValue())) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public V get(Object key) {
+return get(key, Long.MAX_VALUE);
+}
+
+public V get(Object key, long epoch) {
+Entry entry =
+snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+if (entry == null) {
+return null;
+}
+return entry.getValue();
+}
+
+@Override
+public V put(K key, V value) {
+Objects.requireNonNull(key);
+   

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566481017



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: KAFKA-12247: add timeout and static group rebalance to remove thread

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r566478398



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -69,6 +73,7 @@
 import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import 
org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
+import org.apache.kafka.common.errors.TimeoutException;

Review comment:
   nit: put this import above with the others (IDE often misplaces these 
since we follow a weird import ordering in places)

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,31 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldRemoveStreamThreadWithStaticMembership() throws 
Exception {
+properties.put("group.instance.id", "test");

Review comment:
   ```suggestion
   properties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "member-A");
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566469662



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @paramThe key type of the set.
+ * @paramThe value type of the set.
+ */
+public class TimelineHashMap
+extends SnapshottableHashTable>
+implements Map {
+static class TimelineHashMapEntry
+implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry {
+private final K key;
+private final V value;
+private long startEpoch;
+
+TimelineHashMapEntry(K key, V value) {
+this.key = key;
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+@Override
+public K getKey() {
+return key;
+}
+
+@Override
+public V getValue() {
+return value;
+}
+
+@Override
+public V setValue(V value) {
+// This would be inefficient to support since we'd need a 
back-reference
+// to the enclosing map in each Entry object.  There would also be
+// complications if this entry object was sourced from a 
historical iterator;
+// we don't support modifying the past.  Since we don't really 
need this API,
+// let's just not support it.
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashMapEntry)) return false;
+TimelineHashMapEntry other = (TimelineHashMapEntry) o;
+return key.equals(other.key);
+}
+
+@Override
+public int hashCode() {
+return key.hashCode();
+}
+}
+
+public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean containsKey(Object key) {
+return containsKey(key, Long.MAX_VALUE);
+}
+
+public boolean containsKey(Object key, long epoch) {
+return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+}
+
+@Override
+public boolean containsValue(Object value) {
+Iterator> iter = entrySet().iterator();
+while (iter.hasNext()) {
+Entry e = iter.next();
+if (value.equals(e.getValue())) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public V get(Object key) {
+return get(key, Long.MAX_VALUE);
+}
+
+public V get(Object key, long epoch) {
+Entry entry =
+snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+if (entry == null) {
+return null;
+}
+return entry.getValue();
+}
+
+@Override
+public V put(K key, V value) {
+Objects.requireNonNull(key);
+   

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566469311



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/TimelineHashMap.java
##
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * This is a hash map which can be snapshotted.
+ *
+ * See {@SnapshottableHashTable} for more details about the implementation.
+ *
+ * This class requires external synchronization.  Null keys and values are not 
supported.
+ *
+ * @paramThe key type of the set.
+ * @paramThe value type of the set.
+ */
+public class TimelineHashMap
+extends SnapshottableHashTable>
+implements Map {
+static class TimelineHashMapEntry
+implements SnapshottableHashTable.ElementWithStartEpoch, 
Map.Entry {
+private final K key;
+private final V value;
+private long startEpoch;
+
+TimelineHashMapEntry(K key, V value) {
+this.key = key;
+this.value = value;
+this.startEpoch = Long.MAX_VALUE;
+}
+
+@Override
+public K getKey() {
+return key;
+}
+
+@Override
+public V getValue() {
+return value;
+}
+
+@Override
+public V setValue(V value) {
+// This would be inefficient to support since we'd need a 
back-reference
+// to the enclosing map in each Entry object.  There would also be
+// complications if this entry object was sourced from a 
historical iterator;
+// we don't support modifying the past.  Since we don't really 
need this API,
+// let's just not support it.
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void setStartEpoch(long startEpoch) {
+this.startEpoch = startEpoch;
+}
+
+@Override
+public long startEpoch() {
+return startEpoch;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public boolean equals(Object o) {
+if (!(o instanceof TimelineHashMapEntry)) return false;
+TimelineHashMapEntry other = (TimelineHashMapEntry) o;
+return key.equals(other.key);
+}
+
+@Override
+public int hashCode() {
+return key.hashCode();
+}
+}
+
+public TimelineHashMap(SnapshotRegistry snapshotRegistry, int 
expectedSize) {
+super(snapshotRegistry, expectedSize);
+}
+
+@Override
+public int size() {
+return size(Long.MAX_VALUE);
+}
+
+public int size(long epoch) {
+return snapshottableSize(epoch);
+}
+
+@Override
+public boolean isEmpty() {
+return isEmpty(Long.MAX_VALUE);
+}
+
+public boolean isEmpty(long epoch) {
+return snapshottableSize(epoch) == 0;
+}
+
+@Override
+public boolean containsKey(Object key) {
+return containsKey(key, Long.MAX_VALUE);
+}
+
+public boolean containsKey(Object key, long epoch) {
+return snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch) 
!= null;
+}
+
+@Override
+public boolean containsValue(Object value) {
+Iterator> iter = entrySet().iterator();
+while (iter.hasNext()) {
+Entry e = iter.next();
+if (value.equals(e.getValue())) {
+return true;
+}
+}
+return false;
+}
+
+@Override
+public V get(Object key) {
+return get(key, Long.MAX_VALUE);
+}
+
+public V get(Object key, long epoch) {
+Entry entry =
+snapshottableGet(new TimelineHashMapEntry<>(key, null), epoch);
+if (entry == null) {
+return null;
+}
+return entry.getValue();
+}
+
+@Override
+public V put(K key, V value) {
+Objects.requireNonNull(key);
+   

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566468092



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+private final Logger log;
+
+/**
+ * The current epoch.  All snapshot epochs are lower than this number.
+ */
+private long curEpoch;
+
+/**
+ * An ArrayList of snapshots, kept in sorted order.
+ */
+private final ArrayList snapshots;
+
+public SnapshotRegistry(long startEpoch) {
+this(new LogContext(), startEpoch);
+}
+
+public SnapshotRegistry(LogContext logContext, long startEpoch) {
+this.log = logContext.logger(SnapshotRegistry.class);
+this.curEpoch = startEpoch;
+this.snapshots = new ArrayList<>(5);
+}
+
+/**
+ * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+ */
+public Iterator snapshots() {
+return snapshots.iterator();
+}
+
+/**
+ * Gets the snapshot for a specific epoch.
+ */
+public Snapshot get(long epoch) {
+for (Snapshot snapshot : snapshots) {
+if (snapshot.epoch() == epoch) {
+return snapshot;
+}
+}
+throw new RuntimeException("No snapshot for epoch " + epoch);
+}
+
+/**
+ * Creates a new snapshot at the given epoch.
+ *
+ * @param epoch The epoch to create the snapshot at.  The 
current epoch
+ *  will be advanced to one past this epoch.
+ */
+public Snapshot createSnapshot(long epoch) {
+if (epoch < curEpoch) {
+throw new RuntimeException("Can't create a new snapshot at epoch " 
+ epoch +
+" because the current epoch is " + curEpoch);
+}
+Snapshot snapshot = new Snapshot(epoch);
+snapshots.add(snapshot);
+curEpoch = epoch + 1;
+log.debug("Creating snapshot {}", epoch);
+return snapshot;
+}
+
+/**
+ * Deletes the snapshot with the given epoch.
+ *
+ * @param epoch The epoch of the snapshot to delete.
+ */
+public void deleteSnapshot(long epoch) {
+Iterator iter = snapshots.iterator();
+while (iter.hasNext()) {
+Snapshot snapshot = iter.next();
+if (snapshot.epoch() == epoch) {
+log.debug("Deleting snapshot {}", epoch);
+iter.remove();
+return;
+}
+}
+throw new RuntimeException(String.format(
+"No snapshot at epoch %d found. Snapshot epochs are %s.", epoch,
+snapshots.stream().map(snapshot -> 
String.valueOf(snapshot.epoch())).
+collect(Collectors.joining(", ";
+}
+
+/**
+ * Reverts the state of all data structures to the state at the given 
epoch.
+ *
+ * @param epoch The epoch of the snapshot to revert to.
+ */
+public void revertToSnapshot(long epoch) {

Review comment:
   It does do that.  I'll add a comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566468092



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+private final Logger log;
+
+/**
+ * The current epoch.  All snapshot epochs are lower than this number.
+ */
+private long curEpoch;
+
+/**
+ * An ArrayList of snapshots, kept in sorted order.
+ */
+private final ArrayList snapshots;
+
+public SnapshotRegistry(long startEpoch) {
+this(new LogContext(), startEpoch);
+}
+
+public SnapshotRegistry(LogContext logContext, long startEpoch) {
+this.log = logContext.logger(SnapshotRegistry.class);
+this.curEpoch = startEpoch;
+this.snapshots = new ArrayList<>(5);
+}
+
+/**
+ * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+ */
+public Iterator snapshots() {
+return snapshots.iterator();
+}
+
+/**
+ * Gets the snapshot for a specific epoch.
+ */
+public Snapshot get(long epoch) {
+for (Snapshot snapshot : snapshots) {
+if (snapshot.epoch() == epoch) {
+return snapshot;
+}
+}
+throw new RuntimeException("No snapshot for epoch " + epoch);
+}
+
+/**
+ * Creates a new snapshot at the given epoch.
+ *
+ * @param epoch The epoch to create the snapshot at.  The 
current epoch
+ *  will be advanced to one past this epoch.
+ */
+public Snapshot createSnapshot(long epoch) {
+if (epoch < curEpoch) {
+throw new RuntimeException("Can't create a new snapshot at epoch " 
+ epoch +
+" because the current epoch is " + curEpoch);
+}
+Snapshot snapshot = new Snapshot(epoch);
+snapshots.add(snapshot);
+curEpoch = epoch + 1;
+log.debug("Creating snapshot {}", epoch);
+return snapshot;
+}
+
+/**
+ * Deletes the snapshot with the given epoch.
+ *
+ * @param epoch The epoch of the snapshot to delete.
+ */
+public void deleteSnapshot(long epoch) {
+Iterator iter = snapshots.iterator();
+while (iter.hasNext()) {
+Snapshot snapshot = iter.next();
+if (snapshot.epoch() == epoch) {
+log.debug("Deleting snapshot {}", epoch);
+iter.remove();
+return;
+}
+}
+throw new RuntimeException(String.format(
+"No snapshot at epoch %d found. Snapshot epochs are %s.", epoch,
+snapshots.stream().map(snapshot -> 
String.valueOf(snapshot.epoch())).
+collect(Collectors.joining(", ";
+}
+
+/**
+ * Reverts the state of all data structures to the state at the given 
epoch.
+ *
+ * @param epoch The epoch of the snapshot to revert to.
+ */
+public void revertToSnapshot(long epoch) {

Review comment:
   It does do that.  I'll add a line to the JavaDoc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566467584



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-28 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r566463935



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -983,48 +999,88 @@ private FetchResponseData tryCompleteFetchRequest(
 FetchRequestData.FetchPartition request,
 long currentTimeMs
 ) {
-Optional errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-if (errorOpt.isPresent()) {
-return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-}
+try {
+Optional errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+if (errorOpt.isPresent()) {
+return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+}
 
-long fetchOffset = request.fetchOffset();
-int lastFetchedEpoch = request.lastFetchedEpoch();
-LeaderState state = quorum.leaderStateOrThrow();
-Optional divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-if (divergingEpochOpt.isPresent()) {
-Optional divergingEpoch =
-divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-.setEpoch(offsetAndEpoch.epoch)
-.setEndOffset(offsetAndEpoch.offset));
-return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-} else {
-LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+long fetchOffset = request.fetchOffset();
+int lastFetchedEpoch = request.lastFetchedEpoch();
+LeaderState state = quorum.leaderStateOrThrow();
+ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+final Records records;
+if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
 
-if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-onUpdateLeaderHighWatermark(state, currentTimeMs);
+if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+onUpdateLeaderHighWatermark(state, currentTimeMs);
+}
+
+records = info.records;
+} else {
+records = MemoryRecords.EMPTY;
 }
 
-return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+} catch (Exception e) {
+logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
 }
 }
 
 /**
  * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
  * is the largest epoch such that subsequent records are known to diverge.
  */
-private Optional validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-return Optional.empty();
+private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+if (log.startOffset() == 0 && fetchOffset == 0) {
+if (lastFetchedEpoch != 0) {
+logger.warn(
+"Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+fetchOffset,
+lastFetchedEpoch
+);
+}
+return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-28 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r566463607



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -147,18 +223,102 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+// Do not let the state machine create snapshots older than the latest 
snapshot
+latestSnapshotId().ifPresent { latest =>
+  if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+// Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+throw new IllegalArgumentException(
+  s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+)
+  }
+}
+
+FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
 try {
-  Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  if (snapshotIds.contains(snapshotId)) {
+Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+  } else {
+Optional.empty()
+  }
+} catch {
+  case _: NoSuchFileException =>
+Optional.empty()
+}
+  }
+
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+try {
+  Optional.of(snapshotIds.last)
 } catch {
-  case e: NoSuchFileException => Optional.empty()
+  case _: NoSuchElementException =>
+Optional.empty()
+}
+  }
+
+  override def oldestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+oldestSnapshotId
+  }
+
+  override def onSnapshotFrozen(snapshotId: raft.OffsetAndEpoch): Unit = {
+snapshotIds.add(snapshotId)
+  }
+
+  override def deleteToNewOldestSnapshotId(logStartSnapshotId: 
raft.OffsetAndEpoch): Boolean = {

Review comment:
   Picked `deleteBeforeSnapshot`.

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -113,6 +160,22 @@ class KafkaMetadataLog(
 log.truncateTo(offset)
   }
 
+  override def maybeTruncateFullyToLatestSnapshot(): Boolean = {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-28 Thread GitBox


hachikuji merged pull request #9589:
URL: https://github.com/apache/kafka/pull/9589


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-28 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r566460558



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1910,7 +1931,7 @@ class Log(@volatile private var _dir: File,
 in the header.
   */
   appendInfo.firstOffset match {
-case Some(firstOffset) => roll(Some(firstOffset))
+case Some(firstOffset) => roll(Some(firstOffset.messageOffset))

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566457016



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566454521



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566452138



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566451017



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshotRegistry.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+/**
+ * A registry containing snapshots of timeline data structures.
+ * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
+ * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ */
+public class SnapshotRegistry {
+private final Logger log;
+
+/**
+ * The current epoch.  All snapshot epochs are lower than this number.
+ */
+private long curEpoch;
+
+/**
+ * An ArrayList of snapshots, kept in sorted order.
+ */
+private final ArrayList snapshots;
+
+public SnapshotRegistry(long startEpoch) {
+this(new LogContext(), startEpoch);
+}
+
+public SnapshotRegistry(LogContext logContext, long startEpoch) {
+this.log = logContext.logger(SnapshotRegistry.class);
+this.curEpoch = startEpoch;
+this.snapshots = new ArrayList<>(5);
+}
+
+/**
+ * Returns an iterator that moves through snapshots from the lowest to the 
highest epoch.
+ */
+public Iterator snapshots() {
+return snapshots.iterator();
+}
+
+/**
+ * Gets the snapshot for a specific epoch.
+ */
+public Snapshot get(long epoch) {
+for (Snapshot snapshot : snapshots) {
+if (snapshot.epoch() == epoch) {
+return snapshot;
+}
+}
+throw new RuntimeException("No snapshot for epoch " + epoch);
+}
+
+/**
+ * Creates a new snapshot at the given epoch.
+ *
+ * @param epoch The epoch to create the snapshot at.  The 
current epoch
+ *  will be advanced to one past this epoch.
+ */
+public Snapshot createSnapshot(long epoch) {

Review comment:
   We want the epoch to be equal to the offset in the metadata log.  So, it 
needs to be provided externally.
   
   (For example, a snapshot at epoch 123 would have the state of the metadata 
data structures after replaying all the metadata records from 0 to 123.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-28 Thread GitBox


cmccabe commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r566450389



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte
+ * current state.
+ *
+ * One very important feature of SnapshottableHashTable is that we support 
iterating
+ * over a snapshot even while changes are being made to the current state.  
See the
+ * Javadoc for the iterator for more information about how this is 
accomplished.
+ *
+ * All of these classes require external synchronization, and don't support 
null keys or
+ * values.
+ */
+class SnapshottableHashTable
+extends BaseHashTable implements Revertable {
+interface ElementWithStartEpoch {
+void setStartEpoch(long startEpoch);
+long startEpoch();
+}
+
+static class HashTier {
+private final int size;
+private Base

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-28 Thread GitBox


rajinisivaram commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r566442295



##
File path: 
core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
##
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package kafka.server
+package unit.kafka.server

Review comment:
   We use `kafka.server` for the other unit tests in this directory, so we 
should do the same here (in Scala, the package name doesn't need to match the 
directory structure).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-28 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r566430269



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -983,48 +999,88 @@ private FetchResponseData tryCompleteFetchRequest(
 FetchRequestData.FetchPartition request,
 long currentTimeMs
 ) {
-Optional errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
-if (errorOpt.isPresent()) {
-return buildEmptyFetchResponse(errorOpt.get(), Optional.empty());
-}
+try {
+Optional errorOpt = 
validateLeaderOnlyRequest(request.currentLeaderEpoch());
+if (errorOpt.isPresent()) {
+return buildEmptyFetchResponse(errorOpt.get(), 
Optional.empty());
+}
 
-long fetchOffset = request.fetchOffset();
-int lastFetchedEpoch = request.lastFetchedEpoch();
-LeaderState state = quorum.leaderStateOrThrow();
-Optional divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-if (divergingEpochOpt.isPresent()) {
-Optional divergingEpoch =
-divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-.setEpoch(offsetAndEpoch.epoch)
-.setEndOffset(offsetAndEpoch.offset));
-return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-} else {
-LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
+long fetchOffset = request.fetchOffset();
+int lastFetchedEpoch = request.lastFetchedEpoch();
+LeaderState state = quorum.leaderStateOrThrow();
+ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+final Records records;
+if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
+LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
 
-if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
-onUpdateLeaderHighWatermark(state, currentTimeMs);
+if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+onUpdateLeaderHighWatermark(state, currentTimeMs);
+}
+
+records = info.records;
+} else {
+records = MemoryRecords.EMPTY;
 }
 
-return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
+} catch (Exception e) {
+logger.error("Caught unexpected error in fetch completion of 
request {}", request, e);
+return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, 
Optional.empty());
 }
 }
 
 /**
  * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
  * is the largest epoch such that subsequent records are known to diverge.
  */
-private Optional validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-return Optional.empty();
+private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+if (log.startOffset() == 0 && fetchOffset == 0) {
+if (lastFetchedEpoch != 0) {
+logger.warn(
+"Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+fetchOffset,
+lastFetchedEpoch
+);
+}
+return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
 }
 
-OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
-.orElse(new OffsetAndEpoch(-1L, -1));
-if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-return Optional.of(endOffsetAndEpoch);
+
+OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> {
+return new IllegalStateException(
+String.format(
+"Expected to find an end offset for epoch %s since it must 
be less than the current epoch %s",
+lastFetchedEpoch,
+quorum.epoch()
+)
+);
+});
+
+if (log.oldestSnapshotId().isPresent() &&
+((fetchOffset < log.startOffset()) ||
+ (fetchOffset == log.startOffset() && lastFetchedEpoch != 
log.oldestSnapshotId().get().epoch) ||
+  

[GitHub] [kafka] wangwalton opened a new pull request #9993: Schema Exception Interface fix

2021-01-28 Thread GitBox


wangwalton opened a new pull request #9993:
URL: https://github.com/apache/kafka/pull/9993


   Bug fix. I'm passing in a SchemaBuilder for schema, and this will always 
throw an exception since it's doing an equals comparison. The reason that I 
need to pass in a builder is because my schema contains possibilities for a 
graph, and the only way to represent this is with SchemaBuilder since 
ConnectSchema is not mutable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566406761



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -99,7 +99,7 @@
 public final InetSocketAddress address;
 
 public InetAddressSpec(InetSocketAddress address) {
-if (address.equals(NON_ROUTABLE_ADDRESS)) {
+if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) {

Review comment:
   Right. Makes sense. Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566405275



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -99,7 +99,7 @@
 public final InetSocketAddress address;
 
 public InetAddressSpec(InetSocketAddress address) {
-if (address.equals(NON_ROUTABLE_ADDRESS)) {
+if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) {

Review comment:
   Currently our equals method assumes non-null addresses. I can't think of 
a case where we would want it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


lct45 commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566398284



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {
+if (!hasPersistentStores) {
+return UUID.randomUUID();
+}
+
+if (!lockStateDirectory()) {
+log.error("Unable to obtain lock as state directory is already 
locked by another process");
+throw new StreamsException("Unable to initialize state, this can 
happen if multiple instances of " +
+   "Kafka Streams are running in the 
same state directory");
+}
+
+final File processFile = new File(stateDir, PROCESS_FILE_NAME);
+try {
+if (processFile.exists()) {
+try (final BufferedReader reader = 
Files.newBufferedReader(processFile.toPath())) {
+// only field in version 0 is the UUID
+final int version = Integer.parseInt(reader.readLine());
+if (version > 0) {

Review comment:
   Do we want to make this set to PROCESS_FILE_VERSION on the off chance 
anyone needs to increment it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9872: KAFKA-10759: ARM support for Kafka

2021-01-28 Thread GitBox


mumrah commented on pull request #9872:
URL: https://github.com/apache/kafka/pull/9872#issuecomment-769382560


   Ok, the ARM build compiles at least 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9992/1/console
   
   I'll try running the unit test suite next.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566394059



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -99,7 +99,7 @@
 public final InetSocketAddress address;
 
 public InetAddressSpec(InetSocketAddress address) {
-if (address.equals(NON_ROUTABLE_ADDRESS)) {
+if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) {

Review comment:
   Yea, I thought of that. But would there ever be a case where we'd need 
to construct the address spec with a null? Probably not. Might as well use a 
nullptr for the address spec at that point I guess. Fair enough, will change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566390451



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -99,7 +99,7 @@
 public final InetSocketAddress address;
 
 public InetAddressSpec(InetSocketAddress address) {
-if (address.equals(NON_ROUTABLE_ADDRESS)) {
+if (address != null && address.equals(NON_ROUTABLE_ADDRESS)) {

Review comment:
   We also don't want to accept null addresses, right? I was thinking we 
could do this:
   ```java
   if (address == null || address.equals(NON_ROUTABLE_ADDRESS)) {
 throw new IllegalArgumentException("Invalid address " + address);
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-28 Thread GitBox


lct45 commented on a change in pull request #9253:
URL: https://github.com/apache/kafka/pull/9253#discussion_r564906839



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/SerdesUnitTest.scala
##
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender
+import org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde
+import org.junit.Assert.assertFalse
+import org.junit.Test
+
+class SerdesUnitTest {
+
+  @Test
+  def shouldLogMessageWhenTimeWindowedSerdeIsUsed(): Unit = {
+
+Serdes.timeWindowedSerde(new TimeWindowedSerde[String]())
+val appender = LogCaptureAppender.createAndRegister()
+val warning = appender.getMessages()
+assertFalse("There should be a warning about TimeWindowedDeserializer", 
warning.isEmpty)

Review comment:
   This test also appears to fail locally since it uses the constructor we 
deprecate in this PR and from a quick search it doesn't look like scala 
suppresses these warnings as easily as Java. Any thoughts on if this test is 
vital / a way to make this work? cc @guozhangwang 
   
   If I suppress warnings, the test fails (: thoughts on viability?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-28 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r566379864



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);

Review comment:
   Nope, we just hold it until the KafkaStreams is closed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566379088



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -89,8 +92,23 @@
 private final int appendLingerMs;
 private final Map voterConnections;
 
-public static abstract class AddressSpec {
-   public abstract InetSocketAddress address();
+public interface AddressSpec {
+}
+
+public static class InetAddressSpec implements AddressSpec {
+public final InetSocketAddress address;
+
+public InetAddressSpec(InetSocketAddress address) {
+if (address.equals(NON_ROUTABLE_ADDRESS)) {

Review comment:
   Good catch! Added





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566378930



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -102,31 +120,23 @@ public boolean equals(Object obj) {
 return false;
 }
 
-final AddressSpec that = (AddressSpec) obj;
-return that.address().equals(address());
+final InetAddressSpec that = (InetAddressSpec) obj;
+return that.address.equals(address);
 }
 }
 
-public static class InetAddressSpec extends AddressSpec {
-private final InetSocketAddress address;
-
-public InetAddressSpec(InetSocketAddress address) {
-if (address.equals(UNROUTABLE_ADDRESS)) {
-throw new IllegalArgumentException("Address not routable");
-}
-this.address = address;
+public static class UnknownAddressSpec implements AddressSpec {
+private UnknownAddressSpec() {

Review comment:
   Yea, that's true actually. Should work fine for the map equals test case 
containing an instance of the UnknownAddressSpec. Removed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566378383



##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -1001,6 +1001,12 @@ class KafkaConfigTest {
 assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
   }
 
+  @Test
+  def testNonRoutableAddressSpecException(): Unit = {
+assertThrows(classOf[IllegalArgumentException],
+  () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))

Review comment:
   Yea, the only thing this is testing is that the `InetAddressSpec` never 
accepts a `0.0.0.0:0` as a parameter. We could remove it instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566377621



##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -126,9 +126,13 @@ class KafkaRaftManager[T](
 case spec: InetAddressSpec => {
   netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
 }
+case _: UnknownAddressSpec => {

Review comment:
   Ah makes sense. Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-28 Thread GitBox


hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r566373256



##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -126,9 +126,13 @@ class KafkaRaftManager[T](
 case spec: InetAddressSpec => {
   netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
 }
+case _: UnknownAddressSpec => {

Review comment:
   nit: the braces in these `case` matches are not needed

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -102,31 +120,23 @@ public boolean equals(Object obj) {
 return false;
 }
 
-final AddressSpec that = (AddressSpec) obj;
-return that.address().equals(address());
+final InetAddressSpec that = (InetAddressSpec) obj;
+return that.address.equals(address);
 }
 }
 
-public static class InetAddressSpec extends AddressSpec {
-private final InetSocketAddress address;
-
-public InetAddressSpec(InetSocketAddress address) {
-if (address.equals(UNROUTABLE_ADDRESS)) {
-throw new IllegalArgumentException("Address not routable");
-}
-this.address = address;
+public static class UnknownAddressSpec implements AddressSpec {
+private UnknownAddressSpec() {

Review comment:
   Since we have a private constructor, I think we will only have the 
`UNKNOWN_ADDRESS_SPEC_INSTANCE` instance. So can we rely on the default 
equals/hashcode?

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -89,8 +92,23 @@
 private final int appendLingerMs;
 private final Map voterConnections;
 
-public static abstract class AddressSpec {
-   public abstract InetSocketAddress address();
+public interface AddressSpec {
+}
+
+public static class InetAddressSpec implements AddressSpec {
+public final InetSocketAddress address;
+
+public InetAddressSpec(InetSocketAddress address) {
+if (address.equals(NON_ROUTABLE_ADDRESS)) {

Review comment:
   Maybe worth adding a null check here?

##
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##
@@ -1001,6 +1001,12 @@ class KafkaConfigTest {
 assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
   }
 
+  @Test
+  def testNonRoutableAddressSpecException(): Unit = {
+assertThrows(classOf[IllegalArgumentException],
+  () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))

Review comment:
   nit: I still think `RaftConfigTest` is a better home for this. It 
doesn't involve `KafkaConfig` at all. We could also just delete the test since 
it probably isn't buying us much.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2021-01-28 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-8940:
--
Description: 
The test does not properly account for windowing. See this comment for full 
details.

We can patch this test by fixing the timestamps of the input data to avoid 
crossing over a window boundary, or account for this when verifying the output. 
Since we have access to the input data it should be possible to compute 
whether/when we do cross a window boundary, and adjust the expected output 
accordingly

  was:The test does not properly account for windowing. See [this 
comment|https://issues.apache.org/jira/browse/KAFKA-8940?focusedCommentId=17214850&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17214850]
 for full details


> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, newbie++
>
> The test does not properly account for windowing. See this comment for full 
> details.
> We can patch this test by fixing the timestamps of the input data to avoid 
> crossing over a window boundary, or account for this when verifying the 
> output. Since we have access to the input data it should be possible to 
> compute whether/when we do cross a window boundary, and adjust the expected 
> output accordingly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2021-01-28 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-8940:
--
Description: The test does not properly account for windowing. See [this 
comment|https://issues.apache.org/jira/browse/KAFKA-8940?focusedCommentId=17214850&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17214850]
 for full details  (was: The test does not properly account for windowing. See)

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, newbie++
>
> The test does not properly account for windowing. See [this 
> comment|https://issues.apache.org/jira/browse/KAFKA-8940?focusedCommentId=17214850&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17214850]
>  for full details



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2021-01-28 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-8940:
--
Description: The test does not properly account for windowing. See  (was: I 
lost the screen shot unfortunately... it reports the set of expected records 
does not match the received records.)

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, newbie++
>
> The test does not properly account for windowing. See



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2021-01-28 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-8940:
--
Labels: flaky-test newbie++  (was: flaky-test)

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, newbie++
>
> I lost the screen shot unfortunately... it reports the set of expected 
> records does not match the received records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah opened a new pull request #9992: KAFKA-10759 Add ARM build stage

2021-01-28 Thread GitBox


mumrah opened a new pull request #9992:
URL: https://github.com/apache/kafka/pull/9992


   Copy of https://github.com/apache/kafka/pull/9872, but opened by a committer 
so Jenkins will actually apply the Jenkinsfile changes
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9872: KAFKA-10759: ARM support for Kafka

2021-01-28 Thread GitBox


mumrah commented on pull request #9872:
URL: https://github.com/apache/kafka/pull/9872#issuecomment-769342004


   I believe we have Jenkins configured so it will only take Jenkinsfile 
changes in a PR if they are from a committer. I'll open a PR shortly to see if 
this works.
   
   @ijuma looks like a few different ARM images are provided by Apache Infra: 
https://cwiki.apache.org/confluence/display/INFRA/ci-builds.apache.org



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >