[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag

2021-07-08 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13008:
---
Fix Version/s: 3.0.0

> Stream will stop processing data for a long time while waiting for the 
> partition lag
> 
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: image-2021-07-07-11-19-55-630.png
>
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  (note: after initialization, the  position of A-1 will be: position: null, 
> highWM: null)
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the lag of partition A-1. And why we can't get the lag? It's because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check 
> [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272].
>  So, in the above example, the metadata cache for partition A-1 will be 
> cleared in step 4, and re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> partition lag, like what we did in previous KIP-353
>  # Introduce a waiting time config when no partition lag, or partition lag 
> keeps > 0 (need KIP)
> [~vvcephei] [~guozhang] , any suggestions?
>  
> cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
> [https://github.com/apache/kafka/pull/10736,] we discussed and thought 
> there's a data lose situation. FYI.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-07-08 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1738#comment-1738
 ] 

Konstantine Karantasis commented on KAFKA-9295:
---

Thanks for the update [~ableegoldman]. Based on that I've added 3.0 as a target 
fix version for https://issues.apache.org/jira/browse/KAFKA-13008 and have 
marked that ticket as blocker. I'll be looking for any updates on that issue or 
its severity. 

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-4327) Move Reset Tool from core to streams

2021-07-08 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-4327:
--
Fix Version/s: (was: 3.0.0)
   4.0.0

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
>  {{"Use 'kafka.tools.StreamsResetter' tool"}}
>  -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility – not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.
> KIP-756: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2021-07-08 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1739#comment-1739
 ] 

Konstantine Karantasis commented on KAFKA-4327:
---

Based on your last comment [~mjsax] I've updated the target version for this 
issue to 4.0

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
>  {{"Use 'kafka.tools.StreamsResetter' tool"}}
>  -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility – not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.
> KIP-756: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-756%3A+Move+StreamsResetter+tool+outside+of+core]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12487) Sink connectors do not work with the cooperative consumer rebalance protocol

2021-07-08 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12487:
---
Priority: Blocker  (was: Major)

> Sink connectors do not work with the cooperative consumer rebalance protocol
> 
>
> Key: KAFKA-12487
> URL: https://issues.apache.org/jira/browse/KAFKA-12487
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The {{ConsumerRebalanceListener}} used by the framework to respond to 
> rebalance events in consumer groups for sink tasks is hard-coded with the 
> assumption that the consumer performs rebalances eagerly. In other words, it 
> assumes that whenever {{onPartitionsRevoked}} is called, all partitions have 
> been revoked from that consumer, and whenever {{onPartitionsAssigned}} is 
> called, the partitions passed in to that method comprise the complete set of 
> topic partitions assigned to that consumer.
> See the [WorkerSinkTask.HandleRebalance 
> class|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L730]
>  for the specifics.
>  
> One issue this can cause is silently ignoring to-be-committed offsets 
> provided by sink tasks, since the framework ignores offsets provided by tasks 
> in their {{preCommit}} method if it does not believe that the consumer for 
> that task is currently assigned the topic partition for that offset. See 
> these lines in the [WorkerSinkTask::commitOffsets 
> method|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L429-L430]
>  for reference.
>  
> This may not be the only issue caused by configuring a sink connector's 
> consumer to use cooperative rebalancing. Rigorous unit and integration 
> testing should be added before claiming that the Connect framework supports 
> the use of cooperative consumers with sink connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12487) Sink connectors do not work with the cooperative consumer rebalance protocol

2021-07-08 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12487:
---
Fix Version/s: 3.0.0

> Sink connectors do not work with the cooperative consumer rebalance protocol
> 
>
> Key: KAFKA-12487
> URL: https://issues.apache.org/jira/browse/KAFKA-12487
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0
>
>
> The {{ConsumerRebalanceListener}} used by the framework to respond to 
> rebalance events in consumer groups for sink tasks is hard-coded with the 
> assumption that the consumer performs rebalances eagerly. In other words, it 
> assumes that whenever {{onPartitionsRevoked}} is called, all partitions have 
> been revoked from that consumer, and whenever {{onPartitionsAssigned}} is 
> called, the partitions passed in to that method comprise the complete set of 
> topic partitions assigned to that consumer.
> See the [WorkerSinkTask.HandleRebalance 
> class|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L730]
>  for the specifics.
>  
> One issue this can cause is silently ignoring to-be-committed offsets 
> provided by sink tasks, since the framework ignores offsets provided by tasks 
> in their {{preCommit}} method if it does not believe that the consumer for 
> that task is currently assigned the topic partition for that offset. See 
> these lines in the [WorkerSinkTask::commitOffsets 
> method|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L429-L430]
>  for reference.
>  
> This may not be the only issue caused by configuring a sink connector's 
> consumer to use cooperative rebalancing. Rigorous unit and integration 
> testing should be added before claiming that the Connect framework supports 
> the use of cooperative consumers with sink connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12487) Sink connectors do not work with the cooperative consumer rebalance protocol

2021-07-08 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377785#comment-17377785
 ] 

Konstantine Karantasis commented on KAFKA-12487:


Just a note that for tickets that need to target a specific version, it's 
highly recommended (if not necessary) to add the fix versions field. Marked the 
issue as blocker for 3.0 and will be taking a look before code freeze, which 
approaches quickly. 

> Sink connectors do not work with the cooperative consumer rebalance protocol
> 
>
> Key: KAFKA-12487
> URL: https://issues.apache.org/jira/browse/KAFKA-12487
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The {{ConsumerRebalanceListener}} used by the framework to respond to 
> rebalance events in consumer groups for sink tasks is hard-coded with the 
> assumption that the consumer performs rebalances eagerly. In other words, it 
> assumes that whenever {{onPartitionsRevoked}} is called, all partitions have 
> been revoked from that consumer, and whenever {{onPartitionsAssigned}} is 
> called, the partitions passed in to that method comprise the complete set of 
> topic partitions assigned to that consumer.
> See the [WorkerSinkTask.HandleRebalance 
> class|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L730]
>  for the specifics.
>  
> One issue this can cause is silently ignoring to-be-committed offsets 
> provided by sink tasks, since the framework ignores offsets provided by tasks 
> in their {{preCommit}} method if it does not believe that the consumer for 
> that task is currently assigned the topic partition for that offset. See 
> these lines in the [WorkerSinkTask::commitOffsets 
> method|https://github.com/apache/kafka/blob/b96fc7892f1e885239d3290cf509e1d1bb41e7db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L429-L430]
>  for reference.
>  
> This may not be the only issue caused by configuring a sink connector's 
> consumer to use cooperative rebalancing. Rigorous unit and integration 
> testing should be added before claiming that the Connect framework supports 
> the use of cooperative consumers with sink connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13051) Require Principal Serde to be defined for 3.0

2021-07-08 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13051:
---
Fix Version/s: 3.0.0

> Require Principal Serde to be defined for 3.0
> -
>
> Key: KAFKA-13051
> URL: https://issues.apache.org/jira/browse/KAFKA-13051
> Project: Kafka
>  Issue Type: Task
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13051) Require Principal Serde to be defined for 3.0

2021-07-08 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377792#comment-17377792
 ] 

Konstantine Karantasis commented on KAFKA-13051:


[~rdielhenn] judging from the title I'm targeting 3.0.0 for this issue. Is it 
also a release blocker (priority)?

> Require Principal Serde to be defined for 3.0
> -
>
> Key: KAFKA-13051
> URL: https://issues.apache.org/jira/browse/KAFKA-13051
> Project: Kafka
>  Issue Type: Task
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-07-08 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r97373



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is the {@link RemoteLogMetadataManager} implementation with storage as 
an internal topic with name {@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
+ * This is used to publish and fetch {@link RemoteLogMetadata} for the 
registered user topic partitions with
+ * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an 
instance of this class and it subscribes
+ * to metadata updates for the registered user topic partitions.
+ */
+public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+private static final long INITIALIZATION_RETRY_INTERVAL_MS = 3L;
+
+private volatile boolean configured = false;
+
+// It indicates whether the close process of this instance is started or 
not via #close() method.
+// Using AtomicBoolean instead of volatile as it may encounter 
http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
+// if the field is read but not updated in a spin loop like in 
#initializeResources() method.
+private final AtomicBoolean closing = new AtomicBoolean(false);
+private final AtomicBoolean initialized = new AtomicBoolean(false);
+private final Time time = Time.SYSTEM;
+
+private Thread initializationThread;
+private volatile ProducerManager producerManager;
+private volatile ConsumerManager consumerManager;
+
+// This allows to gracefully close this instance using {@link #close()} 
method while there are some pending or new
+// requests calling different methods which use the resources like 
producer/consumer managers.
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+private final RemotePartitionMetadataStore remotePartitionMetadataStore = 
new RemotePartitionMetadataStore();
+private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+private volatile RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
+private volatile Set pendingAssignPartitions = 
Collections.synchronizedSet(new HashSet<>());
+
+@Override
+public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteL

[GitHub] [kafka] hachikuji commented on a change in pull request #10990: MINOR: the broker should use metadata.log.max.record.bytes.between.snapshots

2021-07-08 Thread GitBox


hachikuji commented on a change in pull request #10990:
URL: https://github.com/apache/kafka/pull/10990#discussion_r666512156



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
##
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.concurrent.RejectedExecutionException
+
+import kafka.utils.Logging
+import org.apache.kafka.image.MetadataImage
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.SnapshotWriter
+
+
+trait SnapshotWriterBuilder {
+  def build(committedOffset: Long,
+committedEpoch: Int,
+lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion]
+}
+
+class BrokerMetadataSnapshotter(
+  brokerId: Int,
+  val time: Time,
+  threadNamePrefix: Option[String],
+  writerBuilder: SnapshotWriterBuilder
+) extends Logging with MetadataSnapshotter {
+  private val logContext = new LogContext(s"[BrokerMetadataSnapshotter 
id=${brokerId}] ")
+  private val log = logContext.logger(classOf[BrokerMetadataSnapshotter])

Review comment:
   nit: we shouldn't need this. instead of `log.info`, you can use `info` 
since we are extending `Logging`

##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
##
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.concurrent.RejectedExecutionException
+
+import kafka.utils.Logging
+import org.apache.kafka.image.MetadataImage
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.SnapshotWriter
+
+
+trait SnapshotWriterBuilder {
+  def build(committedOffset: Long,
+committedEpoch: Int,
+lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion]
+}
+
+class BrokerMetadataSnapshotter(
+  brokerId: Int,
+  val time: Time,
+  threadNamePrefix: Option[String],
+  writerBuilder: SnapshotWriterBuilder
+) extends Logging with MetadataSnapshotter {
+  private val logContext = new LogContext(s"[BrokerMetadataSnapshotter 
id=${brokerId}] ")
+  private val log = logContext.logger(classOf[BrokerMetadataSnapshotter])
+  logIdent = logContext.logPrefix()
+
+  /**
+   * The offset of the snapshot in progress, or -1 if there isn't one. 
Accessed only under
+   * the object lock.
+   */
+  private var _currentSnapshotOffset = -1L
+
+  /**
+   * The event queue which runs this listener.
+   */
+  val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
+
+  override def maybeStartSnapshot(committedOffset: Long,
+  committedEpoch: Int,
+  lastContainedLogTime: Long,
+  image: MetadataImage): Boolean = 
synchronized {
+if (_currentSnapshotOffset == -1L) {
+  val writer = writerBuilder.build(committedOffset, committedEpoch, 
lastContainedLogTime)
+  _currentSnapshotOffset = committedOffset
+  info(s"Creating a new snapshot at offset ${committedOffset}...")
+  eventQueue.append(new CreateSnapshotEvent(image, writer))
+  true
+ 

<    1   2   3