[GitHub] [kafka] dengziming commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-06 Thread GitBox


dengziming commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607561841



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy implements 
AdminApiLookupStrategy {

Review comment:
   It seems that the generic type is useless here ?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy implements 
AdminApiLookupStrategy {
+private final Logger log;
+
+public CoordinatorStrategy(
+LogContext logContext
+) {
+this.log = logContext.logger(CoordinatorStrategy.class);
+}
+
+@Override
+public ApiRequestScope lookupScope(CoordinatorKey key) {
+// The `FindCoordinator` API does not support batched lookups, so we 
use a
+// separate lookup context for each coordinator key we need to lookup
+return new LookupRequestScope(key);

Review comment:
   Will we support batching lookups?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.

[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-06 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r607580027



##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogLeaderEpochState.java
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * This class represents the in-memory state of segments associated with a 
leader epoch. This includes the mapping of offset to
+ * segment ids and unreferenced segments which are not mapped to any offset 
but they exist in remote storage.
+ * 
+ * This is used by {@link RemoteLogMetadataCache} to track the segments for 
each leader epoch.
+ */
+class RemoteLogLeaderEpochState {
+
+// It contains offset to segment ids mapping with the segment state as 
COPY_SEGMENT_FINISHED.
+private final NavigableMap offsetToId = new 
ConcurrentSkipListMap<>();
+
+/**
+ * It represents unreferenced segments for this leader epoch. It contains 
the segments still in COPY_SEGMENT_STARTED
+ * and DELETE_SEGMENT_STARTED state or these have been replaced by callers 
with other segments having the same
+ * start offset for the leader epoch. These will be returned by {@link 
RemoteLogMetadataCache#listAllRemoteLogSegments()}
+ * and {@link RemoteLogMetadataCache#listRemoteLogSegments(int 
leaderEpoch)} so that callers can clean them up if
+ * they still exist. These will be cleaned from the cache once they reach 
DELETE_SEGMENT_FINISHED state.
+ */
+private final Set unreferencedSegmentIds = 
ConcurrentHashMap.newKeySet();
+
+// It represents the highest log offset of the segments that were updated 
with updateHighestLogOffset.
+private volatile Long highestLogOffset;
+
+/**
+ * Returns all the segments associated with this leader epoch sorted by 
start offset in ascending order.
+ *
+ * @param idToSegmentMetadata mapping of id to segment metadata. This will 
be used to get RemoteLogSegmentMetadata
+ *for an id to be used for sorting.
+ * @return
+ */
+Iterator 
listAllRemoteLogSegments(Map 
idToSegmentMetadata) {
+// Return all the segments including unreferenced metadata.
+int size = offsetToId.size() + unreferencedSegmentIds.size();
+if (size == 0) {
+return Collections.emptyIterator();
+}
+
+ArrayList metadataList = new 
ArrayList<>(size);
+for (RemoteLogSegmentId id : offsetToId.values()) {
+metadataList.add(idToSegmentMetadata.get(id));
+}
+
+if (!unreferencedSegmentIds.isEmpty()) {
+for (RemoteLogSegmentId id : unreferencedSegmentIds) {
+metadataList.add(idToSegmentMetadata.get(id));
+}
+
+// sort only when unreferenced entries exist as they are already 
sorted in offsetToId.
+
metadataList.sort(Comparator.comparingLong(RemoteLogSegmentMetadata::startOffset));
+}
+
+return metadataList.iterator();
+}
+
+void handleSegmentWithCopySegmentFinishedState(Long startOffset, 
RemoteLogSegmentId remoteLogSegmentId,
+   Long leaderEpochEndOffset) {
+// Add the segment epochs mapping as the segment is copied 
successfully.
+RemoteLogSegmentId oldEntry = offsetToId.put(startOffset, 
remoteLogSegmentId);
+
+// Remove the metadata from unreferenced entries as it is successfully 
copied and added to the offset mapping.
+unreferencedSegmentIds.remove(remoteLogSegmentId);
+
+// Add the old entry to unreferenced entries as the mapping is removed 
for the old entry.
+if (oldEntry != null) {
+unreferencedSegmentIds.add(oldEntry);

Review comment:
   Yes, in the case of unclean leader election, the leader will remove the 
o

[GitHub] [kafka] satishd commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-06 Thread GitBox


satishd commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r607597875



##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicate it is not yet copied successfully. So, these 
segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ * 
+ * Segment in this state indicate it is successfully copied and it is 
available for reads. So, these segments
+ * will be accessible for reads. But this should be available for any cleanup 
activity like deleting segments by the
+ * caller of this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}:
+ * Segment in this state indicate it is getting deleted. That means, it is not 
available for reads. But it should be
+ * available for any cleanup activity like deleting segments by the caller of 
this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}:
+ * Segment in this state indicate it is already deleted. That means, it is not 
available for any activity including
+ * reads or cleanup activity. This cache will clear entries containing this 
state.
+ * 
+ * 
+ *
+ * 
+ * 
+ * 
+ * 
+ * 
+ * COPY_SEGMENT_STARTED
+ * COPY_SEGMENT_FINISHED
+ * DELETE_SEGMENT_STARTED
+ * DELETE_SEGMENT_FINISHED
+ * 
+ * 
+ * 
+ * 
+ * remoteLogSegmentMetadata(int leaderEpoch, long offset)
+ * No
+ * Yes
+ * No
+ * No
+ * 
+ * 
+ * listRemoteLogSegments (int leaderEpoch)
+ * Yes
+ * Yes
+ * Yes
+ * No
+ * 
+ * 
+ * listAllRemoteLogSegments()
+ * Yes
+ * Yes
+ * Yes
+ * No
+ * 
+ * 
+ * 
+ * 
+ * 
+ */
+public class RemoteLogMetadataCache {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+// It contains all the segment-id to metadata mappings which did not reach 
the terminal state viz DELETE_SEGMENT_FINISHED.
+private final ConcurrentMap 
idToSegmentMetadata
+= new ConcurrentHashMap<>();
+
+// It contains leader epoch to the respective entry containing the state.
+private final ConcurrentMap 
leaderEpochEntries = new ConcurrentHashMap<>();

Review comment:
   One way to do that is to clear the entry when the respective 
`RemoteLogLeaderEpochState` is empty. That means all the segments reached 
`DELETE_SEGMENT_FINISHED` state. 
   This is not currently addressed. I plan to look into it when we integrate 
these APIs with RemoteLogManager by exploring other options too. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-06 Thread GitBox


satishd commented on pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#issuecomment-813897020


   Thanks @junrao for the review.  Addressed them with the latest 
[commit](https://github.com/apache/kafka/pull/10218/commits/83050f3e4c03c5ac58c0ba2a9d8fcbf49661a1a9).
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2021-04-06 Thread GitBox


chia7712 commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-813899837


   > I introduced RequestLocal as discussed. Does this seem reasonable to you? 
If so, I propose the following next steps:
   
   That LGTM. For another, the memory utils used by `KafkaRequestHandler` 
(BufferSupplier) is different from `Processor` (MemoryPool). Could we unify the 
interface? It seems to me `RequestLocal` can be applied to `Processor` as well 
in the future.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10389: MINOR: stabilize ListOffsetsRequestTest#testResponseIncludesLeaderEpoch

2021-04-06 Thread GitBox


chia7712 commented on pull request #10389:
URL: https://github.com/apache/kafka/pull/10389#issuecomment-813901559


   unrelated error. merge trunk to trigger QA again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10442: MINOR: update getMagic java doc

2021-04-06 Thread GitBox


showuon commented on pull request #10442:
URL: https://github.com/apache/kafka/pull/10442#issuecomment-813902397


   @chia7712 , could you help check it? Just update one-line java doc. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10442: MINOR: update getMagic java doc

2021-04-06 Thread GitBox


chia7712 merged pull request #10442:
URL: https://github.com/apache/kafka/pull/10442


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10465: MINOR: add unit test for ControllerApis#createTopics

2021-04-06 Thread GitBox


chia7712 commented on a change in pull request #10465:
URL: https://github.com/apache/kafka/pull/10465#discussion_r607623340



##
File path: core/src/test/java/kafka/test/MockController.java
##
@@ -77,8 +82,28 @@ private MockController(Collection initialTopics) {
 }
 
 @Override
-public CompletableFuture 
createTopics(CreateTopicsRequestData request) {
-throw new UnsupportedOperationException();
+synchronized public CompletableFuture
+createTopics(CreateTopicsRequestData request) {
+CreateTopicsResponseData response = new CreateTopicsResponseData();
+for (CreatableTopic topic : request.topics()) {
+if (topics.containsKey(topic.name())) {
+response.topics().add(new CreatableTopicResult().
+setName(topic.name()).
+setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()));
+} else {
+long topicId = nextTopicId.getAndAdd(1);
+Uuid topicUuid = new Uuid(0, topicId);
+topicNameToId.put(topic.name(), topicUuid);
+topics.put(topicUuid, new MockTopic(topic.name(), topicUuid));
+response.topics().add(new CreatableTopicResult().
+setName(topic.name()).
+setErrorCode(Errors.NONE.code()).
+setTopicId(topicUuid));
+// For a better mock, we might want to return configs, 
replication

Review comment:
   agreed. why not including such information?

##
File path: core/src/test/java/kafka/test/MockController.java
##
@@ -77,8 +82,28 @@ private MockController(Collection initialTopics) {
 }
 
 @Override
-public CompletableFuture 
createTopics(CreateTopicsRequestData request) {
-throw new UnsupportedOperationException();
+synchronized public CompletableFuture
+createTopics(CreateTopicsRequestData request) {
+CreateTopicsResponseData response = new CreateTopicsResponseData();
+for (CreatableTopic topic : request.topics()) {
+if (topics.containsKey(topic.name())) {

Review comment:
   the key type of `topics` is `Uuid` and hence this check is weird. Maybe 
it should be replaced by `topicNameToId`

##
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##
@@ -32,10 +32,13 @@ import org.apache.kafka.common.Uuid.ZERO_UUID
 import org.apache.kafka.common.errors.{InvalidRequestException, 
NotControllerException, TopicDeletionDisabledException}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicCollection}
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
-import org.apache.kafka.common.message.{BrokerRegistrationRequestData, 
DeleteTopicsRequestData}
+import org.apache.kafka.common.message.{BrokerRegistrationRequestData, 
CreateTopicsRequestData, DeleteTopicsRequestData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE, 
TOPIC_AUTHORIZATION_FAILED}

Review comment:
   The code `Errors.TOPIC_AUTHORIZATION_FAILED` still exists in this file. 
Could you remove that redundant prefix?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12564) KTable#filter-method called twice after aggregation

2021-04-06 Thread Jess J. (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315343#comment-17315343
 ] 

Jess J. commented on KAFKA-12564:
-

[~mjsax] Thanks for your explanation and time. (y)

> KTable#filter-method called twice after aggregation
> ---
>
> Key: KAFKA-12564
> URL: https://issues.apache.org/jira/browse/KAFKA-12564
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Jess J.
>Priority: Major
>
> Libraries from build.sbt:
> {{"org.apache.kafka" % "kafka_2.13" % "2.7.0",}}
> {{"org.apache.kafka" % "kafka-streams" % "2.7.0",}}
> {{"org.apache.kafka" % "kafka-clients" % "2.7.0",}}
> {{"org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}}
> h4.  
> h4. Feed the Stream "issue_stream" with:
> {{(1->"A")}}
>  {{(1->"B")}}
> h4.  
> h4. Topology:
> {{// #1}}
>  {{val issueStream:KStream[Int,String] = 
> builder.stream[Int,String]("issue_stream")}}
>  
> {{// #2}}
>  {{val aggTable:KTable[Int,String] =}}
>  {{issueStream}}
>  {{.groupBy((k,v)=>k)}}
>  {{.aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}}
>  
> {{// #3}}
>  {{aggTable}}
>  {{.toStream}}
>  {{.print(Printed.toSysOut)}}
>  
> {{// #4}}
>  {{aggTable.filter((k,v)=> {}}
>  {{  println(s"filter($k, $v) at ${System.nanoTime()}")}}
>  {{  true}}
>  {{})}}
>  {{.toStream}}
>  {{.print(Printed.toSysOut)}}
> h4.  
> h4. First Tuple: (1->"A")
> #3 Output as expected, the aggregated tuple ("EMPTY"+"+A")
> {{[KTABLE-TOSTREAM-44]: 1, EMPTY+A}}
>  
> #4 The filter-method is called twice.
>  First call with the expected tuple.
> {{filter(1, EMPTY+A) at 211379567071847}}
> The second call with the empty initialized aggregate.
> {{filter(1, EMPTY) at 211379567120375}}
> Output contains the correct tuple
> {{[KTABLE-TOSTREAM-47]: 1, EMPTY+A}}
> h4.  
> h4. Second Tuple: (1->"B")
> #3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B")
> {{[KTABLE-TOSTREAM-44]: 1, EMPTY+A+B}}
> #4 Again a second unexpected call to filter(...) with the previous tuple 
> before aggregation
>  First call:
> {{filter(1, EMPTY+A+B) at 211379567498482}}
> Second call:
> {{filter(1, EMPTY+A) at 211379567524475}}
> But the output contains only one tuple as expected
> {{[KTABLE-TOSTREAM-47]: 1, EMPTY+A+B}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] yunwan opened a new pull request #10487: Kafka producer Partitioner add canAbortOnNewBatch

2021-04-06 Thread GitBox


yunwan opened a new pull request #10487:
URL: https://github.com/apache/kafka/pull/10487


   Partitioner add canAbortOnNewBatch method to reduce unnecessary abort new 
batch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10721) Rewrite topology to allow for overlapping unequal topic subscriptions

2021-04-06 Thread Salavat Zainullin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Salavat Zainullin updated KAFKA-10721:
--
Description: 
Minor followup improvement to KAFKA-6687 in which we rewrite the topology to 
make it possible for a user to subscribe multiple KStream/KTables to the same 
topic or identical set of topics. We could further extend this to make it 
possible for multiple KStream/KTables to be subscribed to overlapping but not 
identical sets of topics, ie
{code:java}
KStream streamA = builder.stream("topic");
KStream streamB = builder.stream("topic", "other-topic"); {code}
One way to do this would be to break up multiple-topic source nodes into 
multiple single-topic sources that get merged together in the child node.

See 
https://github.com/apache/kafka/pull/9582/files#diff-ac1bf2b23b80784dec20b00fdc42f2df7e5a5133d6c68978fa44aea11e950c3aR347-R349

  was:
Minor followup improvement to KAFKA-6687 in which we rewrite the topology to 
make it possible for a user to subscribe multiple KStream/KTables to the same 
topic or identical set of topics. We could further extend this to make it 
possible for multiple KStream/KTables to be subscribed to overlapping but not 
identical sets of topics, ie
{code:java}
KStream streamA = builder.stream("topic");
KStream streamB = builder.stream("topic, "other-topic"); {code}
One way to do this would be to break up multiple-topic source nodes into 
multiple single-topic sources that get merged together in the child node.

See 
https://github.com/apache/kafka/pull/9582/files#diff-ac1bf2b23b80784dec20b00fdc42f2df7e5a5133d6c68978fa44aea11e950c3aR347-R349


> Rewrite topology to allow for overlapping unequal topic subscriptions
> -
>
> Key: KAFKA-10721
> URL: https://issues.apache.org/jira/browse/KAFKA-10721
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Salavat Zainullin
>Priority: Minor
>  Labels: newbie++
>
> Minor followup improvement to KAFKA-6687 in which we rewrite the topology to 
> make it possible for a user to subscribe multiple KStream/KTables to the same 
> topic or identical set of topics. We could further extend this to make it 
> possible for multiple KStream/KTables to be subscribed to overlapping but not 
> identical sets of topics, ie
> {code:java}
> KStream streamA = builder.stream("topic");
> KStream streamB = builder.stream("topic", "other-topic"); {code}
> One way to do this would be to break up multiple-topic source nodes into 
> multiple single-topic sources that get merged together in the child node.
> See 
> https://github.com/apache/kafka/pull/9582/files#diff-ac1bf2b23b80784dec20b00fdc42f2df7e5a5133d6c68978fa44aea11e950c3aR347-R349



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10487: Kafka producer Partitioner add canAbortOnNewBatch

2021-04-06 Thread GitBox


ijuma commented on pull request #10487:
URL: https://github.com/apache/kafka/pull/10487#issuecomment-814081419


   Thanks for the PR. Since this makes a change to a public interface, a KIP is 
required. Please see 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #10488: MINOR: Remove some unnecessary cyclomatic complexity suppressions

2021-04-06 Thread GitBox


dengziming opened a new pull request #10488:
URL: https://github.com/apache/kafka/pull/10488


   *More detailed description of your change*
   I find these classes don't violate cyclomatic complexity.
   
   *Summary of testing strategy (including rationale)*
   QA
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10438: KAFKA-12579: Remove various deprecated clients classes/methods for 3.0

2021-04-06 Thread GitBox


ijuma commented on a change in pull request #10438:
URL: https://github.com/apache/kafka/pull/10438#discussion_r607812331



##
File path: docs/upgrade.html
##
@@ -27,18 +27,30 @@ Notable changes in 3
 or updating the application not to use internal classes.
 The Streams API removed all deprecated APIs that were deprecated in 
version 2.5.0 or earlier.
 For a complete list of removed APIs compare the detailed Kafka Streams 
upgrade notes.
-The deprecated Scala Authorizer, 
SimpleAclAuthorizer and related classes have been removed. Please 
use the Java Authorizer
-and AclAuthorizer instead.
-The deprecated Metric#value() method was removed (https://issues.apache.org/jira/browse/KAFKA-12573";>KAFKA-12573).
-Deprecated security classes were removed: 
PrincipalBuilder, DefaultPrincipalBuilder and 
ResourceFilter.
-Furthermore, deprecated constants and constructors were removed from 
SslConfigs, SaslConfigs,
-AclBinding and AclBindingFilter.
-The deprecated Admin.electedPreferredLeaders() methods 
were removed. Please use Admin.electLeaders instead.
-The deprecated kafka-preferred-replica-election command 
line tool was removed. Please use kafka-leader-election 
instead.
-The deprecated ConfigEntry constructor was removed (https://issues.apache.org/jira/browse/KAFKA-12577";>KAFKA-12577).
-Please use the remaining public constructor instead.
-The deprecated config value default for the client config 
client.dns.lookup has been removed. In the unlikely
-event that you set this config explicitly, we recommend leaving the 
config unset (use_all_dns_ips is used by default).
+A number of deprecated classes and methods have been removed in the 
clients, core and tools modules:

Review comment:
   Addressed.

##
File path: clients/src/main/java/org/apache/kafka/common/MessageFormatter.java
##
@@ -34,33 +33,21 @@
  */
 public interface MessageFormatter extends Configurable, Closeable {
 
-/**
- * Initialises the MessageFormatter
- * @param props Properties to configure the formatter
- * @deprecated Use {@link #configure(Map)} instead, this method is for 
backward compatibility with the older Formatter interface
- */
-@Deprecated

Review comment:
   Good point, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-06 Thread GitBox


dengziming commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607569144



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy implements 
AdminApiLookupStrategy {
+private final Logger log;
+
+public CoordinatorStrategy(
+LogContext logContext
+) {
+this.log = logContext.logger(CoordinatorStrategy.class);
+}
+
+@Override
+public ApiRequestScope lookupScope(CoordinatorKey key) {
+// The `FindCoordinator` API does not support batched lookups, so we 
use a
+// separate lookup context for each coordinator key we need to lookup
+return new LookupRequestScope(key);

Review comment:
   Will we support batching lookups in the future?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-06 Thread GitBox


dengziming commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607563049



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.Objects;
+
+public class CoordinatorKey {

Review comment:
   This class is similar to `FindCoordinatorRequestData`, can we replace it 
using auto-generated protocal?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2021-04-06 Thread GitBox


ijuma commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-814090052


   @chia7712 Yes, it's worth exploring. I think `MemoryPool` is intended to be 
a thread-safe cache, so it's not trivial, but it may be possible. Are you 
interested in looking into that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2021-04-06 Thread GitBox


ijuma edited a comment on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-814090052


   > Could we unify the interface?
   
   @chia7712 Yes, it's worth exploring. I think `MemoryPool` is intended to be 
a thread-safe cache, so it's not trivial, but it may be possible. Are you 
interested in looking into that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10471: KAFKA-12597: remove deprecated zookeeper option in ReassignPartitionsCommand

2021-04-06 Thread GitBox


ijuma commented on a change in pull request #10471:
URL: https://github.com/apache/kafka/pull/10471#discussion_r607821137



##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -194,33 +192,18 @@ object ReassignPartitionsCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
 val opts = validateAndParseArgs(args)
-var toClose: Option[AutoCloseable] = None
 var failed = true
 
+val props = if (opts.options.has(opts.commandConfigOpt))
+  Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))

Review comment:
   Why was this moved outside the `try`?

##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -229,9 +212,9 @@ object ReassignPartitionsCommand extends Logging {
 println("Error: " + e.getMessage)
 println(Utils.stackTrace(e))
 } finally {
-  // Close the AdminClient or ZooKeeper client, as appropriate.
+  // Close the AdminClient, as appropriate.

Review comment:
   This comment is redundant IMO

##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -1695,14 +1367,12 @@ object ReassignPartitionsCommand extends Logging {
 opts.bootstrapServerOpt,

Review comment:
   Do we need to pass `bootstrapServerOpt` here and the lines below?

##
File path: 
core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
##
@@ -230,15 +214,11 @@ class ReassignPartitionsCommandArgsTest {
   }
 
   @Test
-  def testInvalidCommandConfigArgumentForLegacyGenerate(): Unit = {
-val args = Array(
-  "--zookeeper", "localhost:1234",
-  "--generate",
-  "--broker-list", "101,102",
-  "--topics-to-move-json-file", "myfile.json",
-  "--command-config", "/tmp/command-config.properties"
-)
-shouldFailWith("You must specify --bootstrap-server when using 
\"[command-config]\"", args)
+  def shouldPrintHelpTextIfHelpArg(): Unit = {
+val args: Array[String]= Array(
+  "--help",
+  "--bootstrap-server", "localhost:1234")

Review comment:
   Hmm, we should not require this line to see the help text.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10389: KAFKA-12384: stabilize ListOffsetsRequestTest#testResponseIncludesLeaderEpoch

2021-04-06 Thread GitBox


ijuma commented on pull request #10389:
URL: https://github.com/apache/kafka/pull/10389#issuecomment-814107295


   @chia7712 Is the test still flaky with the latest changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2021-04-06 Thread GitBox


ijuma commented on a change in pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#discussion_r607129364



##
File path: core/src/main/scala/kafka/server/RequestLocal.scala
##
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.utils.BufferSupplier
+
+case class RequestLocal(bufferSupplier: BufferSupplier) {

Review comment:
   Add documentation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2021-04-06 Thread GitBox


ijuma commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-814141618


   @chia7712 I updated the PR. Please review when you have a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10471: KAFKA-12597: remove deprecated zookeeper option in ReassignPartitionsCommand

2021-04-06 Thread GitBox


showuon commented on a change in pull request #10471:
URL: https://github.com/apache/kafka/pull/10471#discussion_r607879162



##
File path: 
core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
##
@@ -230,15 +214,11 @@ class ReassignPartitionsCommandArgsTest {
   }
 
   @Test
-  def testInvalidCommandConfigArgumentForLegacyGenerate(): Unit = {
-val args = Array(
-  "--zookeeper", "localhost:1234",
-  "--generate",
-  "--broker-list", "101,102",
-  "--topics-to-move-json-file", "myfile.json",
-  "--command-config", "/tmp/command-config.properties"
-)
-shouldFailWith("You must specify --bootstrap-server when using 
\"[command-config]\"", args)
+  def shouldPrintHelpTextIfHelpArg(): Unit = {
+val args: Array[String]= Array(
+  "--help",
+  "--bootstrap-server", "localhost:1234")

Review comment:
   Well, I agree. Making the `--bootstrap-server` as required argument 
caused this side effect. I think we should keep the `--bootstrap-server` as 
non-required, and do the argument check by ourselves. I'll update it, and also 
the other PR: https://github.com/apache/kafka/pull/10457 on the `ConfigCommand` 
tomorrow. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10455: MINOR: Support ExponentialBackoff without jitter

2021-04-06 Thread GitBox


mumrah commented on a change in pull request #10455:
URL: https://github.com/apache/kafka/pull/10455#discussion_r607882775



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java
##
@@ -47,7 +47,8 @@ public long backoff(long attempts) {
 }
 double exp = Math.min(attempts, this.expMax);
 double term = initialInterval * Math.pow(multiplier, exp);
-double randomFactor = ThreadLocalRandom.current().nextDouble(1 - 
jitter, 1 + jitter);
+double randomFactor = jitter < Double.MIN_NORMAL ? 1.0 :

Review comment:
   Gotcha, I misunderstood the docs and thought it was ~1. Let's keep it 
as-is




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9988) Connect incorrectly logs that task has failed when one takes too long to shutdown

2021-04-06 Thread Kalpesh Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kalpesh Patel reassigned KAFKA-9988:


Assignee: Kalpesh Patel

> Connect incorrectly logs that task has failed when one takes too long to 
> shutdown
> -
>
> Key: KAFKA-9988
> URL: https://issues.apache.org/jira/browse/KAFKA-9988
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.2.3, 2.5.0, 2.3.2, 2.4.1, 2.4.2, 
> 2.5.1
>Reporter: Sanjana Kaundinya
>Assignee: Kalpesh Patel
>Priority: Major
>  Labels: newbie
>
> If the OffsetStorageReader is closed while the task is trying to shutdown, 
> and the task is trying to access the offsets from the OffsetStorageReader, 
> then we see the following in the logs.
> {code:java}
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:205)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader 
> closed while attempting to read offsets. This is likely because the task was 
> been scheduled to stop but has taken longer than the graceful shutdown period 
> to do so.
> at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
> ... 14 more
> [2020-05-05 05:28:58,937] ERROR WorkerSourceTask{id=connector-18} Task is 
> being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> This is a bit misleading, because the task is already on its way of being 
> shutdown, and doesn't actually need manual intervention to be restarted. We 
> can see that as later on in the logs we see that it throws another 
> unrecoverable exception.
> {code:java}
> [2020-05-05 05:40:39,361] ERROR WorkerSourceTask{id=connector-18} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> {code}
> If we know a task is on its way of shutting down, we should not throw a 
> ConnectException and instead log a warning so that we don't log false 
> negatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #10470: KAFKA-12612: Remove checksum from ConsumerRecord and RecordMetadata for 3.0

2021-04-06 Thread GitBox


dajac commented on a change in pull request #10470:
URL: https://github.com/apache/kafka/pull/10470#discussion_r607914823



##
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##
@@ -614,18 +613,3 @@ class NoOpMessageFormatter extends MessageFormatter {
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], 
output: PrintStream): Unit = {}
 }
 
-class ChecksumMessageFormatter extends MessageFormatter {

Review comment:
   I wonder if we should explicitly mention this one in the upgrade notes. 
It could be considered as part of the `console-consumer` tool which is part of 
our public API. It is in a gray zone. I am pretty sure that it is not used 
anymore but how knows... What's you take on this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-06 Thread GitBox


mumrah commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r607915497



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}
+} else {
+buffer = bufferSupplier.get(Math.min(maxBatchSize, 
records.sizeInBytes()));
+allocatedBuffer = Optional.of(buffer);
+ 

[GitHub] [kafka] vvcephei commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file

2021-04-06 Thread GitBox


vvcephei commented on a change in pull request #10474:
URL: https://github.com/apache/kafka/pull/10474#discussion_r607916726



##
File path: licenses/DWTFYWTPL
##
@@ -0,0 +1,14 @@
+DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE

Review comment:
   Yeah, I'm a little embarrassed to have this in our dependencies. 
¯\_(ツ)_/¯




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2021-04-06 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315626#comment-17315626
 ] 

Marco Lotz commented on KAFKA-5146:
---

[~mjsax] would it be possible for you to have a quick look into the PR? Since 
updates files are quite mutable right now, the PR needs constant maintenance to 
keep it merge-conflict free.

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael G. Noll
>Assignee: Marco Lotz
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #10474: KAFKA-12602: Fix LICENSE file

2021-04-06 Thread GitBox


vvcephei commented on a change in pull request #10474:
URL: https://github.com/apache/kafka/pull/10474#discussion_r607918616



##
File path: LICENSE-binary
##
@@ -0,0 +1,602 @@
+
+ Apache License
+   Version 2.0, January 2004
+http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+  "License" shall mean the terms and conditions for use, reproduction,
+  and distribution as defined by Sections 1 through 9 of this document.
+
+  "Licensor" shall mean the copyright owner or entity authorized by
+  the copyright owner that is granting the License.
+
+  "Legal Entity" shall mean the union of the acting entity and all
+  other entities that control, are controlled by, or are under common
+  control with that entity. For the purposes of this definition,
+  "control" means (i) the power, direct or indirect, to cause the
+  direction or management of such entity, whether by contract or
+  otherwise, or (ii) ownership of fifty percent (50%) or more of the
+  outstanding shares, or (iii) beneficial ownership of such entity.
+
+  "You" (or "Your") shall mean an individual or Legal Entity
+  exercising permissions granted by this License.
+
+  "Source" form shall mean the preferred form for making modifications,
+  including but not limited to software source code, documentation
+  source, and configuration files.
+
+  "Object" form shall mean any form resulting from mechanical
+  transformation or translation of a Source form, including but
+  not limited to compiled object code, generated documentation,
+  and conversions to other media types.
+
+  "Work" shall mean the work of authorship, whether in Source or
+  Object form, made available under the License, as indicated by a
+  copyright notice that is included in or attached to the work
+  (an example is provided in the Appendix below).
+
+  "Derivative Works" shall mean any work, whether in Source or Object
+  form, that is based on (or derived from) the Work and for which the
+  editorial revisions, annotations, elaborations, or other modifications
+  represent, as a whole, an original work of authorship. For the purposes
+  of this License, Derivative Works shall not include works that remain
+  separable from, or merely link (or bind by name) to the interfaces of,
+  the Work and Derivative Works thereof.
+
+  "Contribution" shall mean any work of authorship, including
+  the original version of the Work and any modifications or additions
+  to that Work or Derivative Works thereof, that is intentionally
+  submitted to Licensor for inclusion in the Work by the copyright owner
+  or by an individual or Legal Entity authorized to submit on behalf of
+  the copyright owner. For the purposes of this definition, "submitted"
+  means any form of electronic, verbal, or written communication sent
+  to the Licensor or its representatives, including but not limited to
+  communication on electronic mailing lists, source code control systems,
+  and issue tracking systems that are managed by, or on behalf of, the
+  Licensor for the purpose of discussing and improving the Work, but
+  excluding communication that is conspicuously marked or otherwise
+  designated in writing by the copyright owner as "Not a Contribution."
+
+  "Contributor" shall mean Licensor and any individual or Legal Entity
+  on behalf of whom a Contribution has been received by Licensor and
+  subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+  this License, each Contributor hereby grants to You a perpetual,
+  worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+  copyright license to reproduce, prepare Derivative Works of,
+  publicly display, publicly perform, sublicense, and distribute the
+  Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+  this License, each Contributor hereby grants to You a perpetual,
+  worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+  (except as stated in this section) patent license to make, have made,
+  use, offer to sell, sell, import, and otherwise transfer the Work,
+  where such license applies only to those patent claims licensable
+  by such Contributor that are necessarily infringed by their
+  Contribution(s) alone or by combination of their Contribution(s)
+  with the Work to which such Contribution(s) was submitted. If You
+  institute patent litigation against any entity (including a
+  cross-claim or counterclaim in a lawsuit) alleging that the Work
+  or a Contributi

[GitHub] [kafka] ijuma commented on a change in pull request #10471: KAFKA-12597: remove deprecated zookeeper option in ReassignPartitionsCommand

2021-04-06 Thread GitBox


ijuma commented on a change in pull request #10471:
URL: https://github.com/apache/kafka/pull/10471#discussion_r607929109



##
File path: 
core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
##
@@ -230,15 +214,11 @@ class ReassignPartitionsCommandArgsTest {
   }
 
   @Test
-  def testInvalidCommandConfigArgumentForLegacyGenerate(): Unit = {
-val args = Array(
-  "--zookeeper", "localhost:1234",
-  "--generate",
-  "--broker-list", "101,102",
-  "--topics-to-move-json-file", "myfile.json",
-  "--command-config", "/tmp/command-config.properties"
-)
-shouldFailWith("You must specify --bootstrap-server when using 
\"[command-config]\"", args)
+  def shouldPrintHelpTextIfHelpArg(): Unit = {
+val args: Array[String]= Array(
+  "--help",
+  "--bootstrap-server", "localhost:1234")

Review comment:
   Makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12344) Support SlidingWindows in the Scala API

2021-04-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315675#comment-17315675
 ] 

Matthias J. Sax commented on KAFKA-12344:
-

Sound about right – in the Java code, `windowedBy(SlidingWindows)` returns a 
`TimeWindowedKStream` and thus the Scala code should do a similar thing.

> Support SlidingWindows in the Scala API
> ---
>
> Key: KAFKA-12344
> URL: https://issues.apache.org/jira/browse/KAFKA-12344
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Leah Thomas
>Priority: Major
>  Labels: newbie, scala
>
> in KIP-450 we implemented sliding windows for the Java API but left out a few 
> crucial methods to allow sliding windows to work through the Scala API. We 
> need to add those methods to make the Scala API fully leverage sliding windows



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10438: KAFKA-12579: Remove various deprecated clients classes/methods for 3.0

2021-04-06 Thread GitBox


ijuma commented on pull request #10438:
URL: https://github.com/apache/kafka/pull/10438#issuecomment-814233076


   Failures are unrelated, JDK 11 build passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10438: KAFKA-12579: Remove various deprecated clients classes/methods for 3.0

2021-04-06 Thread GitBox


ijuma merged pull request #10438:
URL: https://github.com/apache/kafka/pull/10438


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10131: KAFKA-5146 / move kafka-streams example to a new module

2021-04-06 Thread GitBox


mjsax commented on a change in pull request #10131:
URL: https://github.com/apache/kafka/pull/10131#discussion_r607979819



##
File path: docs/upgrade.html
##
@@ -37,6 +37,8 @@ Notable changes in 3
 The deprecated kafka-preferred-replica-election command 
line tool was removed. Please use kafka-leader-election 
instead.
 The deprecated ConfigEntry constructor was removed (https://issues.apache.org/jira/browse/KAFKA-12577";>KAFKA-12577).
 Please use the remaining public constructor instead.
+Kafka Streams no longer has a compile time dependency on 
"connect:json" module (https://issues.apache.org/jira/browse/KAFKA-5146";>KAFKA-5146).

Review comment:
   Not: For the top level docs, it might be ok to remove the link to the 
ticket?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-06 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607993137



##
File path: core/src/main/scala/kafka/server/metadata/ClientQuotaCache.scala
##
@@ -122,6 +122,14 @@ class ClientQuotaCache {
   entityFilters.put(entityType, entityMatch)
 }
 
+// Special case for non-strict empty filter, match everything

Review comment:
   ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-06 Thread GitBox


hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607994309



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy implements 
AdminApiLookupStrategy {
+private final Logger log;
+
+public CoordinatorStrategy(
+LogContext logContext
+) {
+this.log = logContext.logger(CoordinatorStrategy.class);
+}
+
+@Override
+public ApiRequestScope lookupScope(CoordinatorKey key) {
+// The `FindCoordinator` API does not support batched lookups, so we 
use a
+// separate lookup context for each coordinator key we need to lookup
+return new LookupRequestScope(key);

Review comment:
   Yes, a future version of FindCoordinator could implement that. Then 
we'll just have to decide how to propagate the version to the lookup strategy. 
I don't think it would be too difficult.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10254: KAFKA-12406 Integrate client quotas with raft broker

2021-04-06 Thread GitBox


cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607994514



##
File path: 
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala
##
@@ -121,16 +121,16 @@ class ClientQuotaMetadataManager(private[metadata] val 
quotaManagers: QuotaManag
   return
 }
 
-// Update the cache
-quotaCache.updateQuotaCache(ipEntity, quotaRecord.key, quotaRecord.value, 
quotaRecord.remove)
-
 // Convert the value to an appropriate Option for the quota manager
 val newValue = if (quotaRecord.remove()) {
   None
 } else {
   Some(quotaRecord.value).map(_.toInt)
 }
 connectionQuotas.updateIpConnectionRateQuota(inetAddress, newValue)
+
+// Update the cache

Review comment:
   Is that code structured in such a way that it can throw?  If so, then 
yes, I think we want to catch that here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-06 Thread GitBox


hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607994863



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy implements 
AdminApiLookupStrategy {

Review comment:
   Oh yeah, this is leftover from a previous iteration.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-06 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r607999624



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}
+} else {
+buffer = bufferSupplier.get(Math.min(maxBatchSize, 
records.sizeInBytes()));
+allocatedBuffer = Optional.of(buffer);
+

[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-06 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r608006894



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}

Review comment:
   @hachikuji @mumrah I am not super happy with this handling. Specially 
since it is possible for us to write a batch greater than `maxBatchSize` based 
on how `BatchAccum

[GitHub] [kafka] ijuma commented on a change in pull request #10131: KAFKA-5146 / move kafka-streams example to a new module

2021-04-06 Thread GitBox


ijuma commented on a change in pull request #10131:
URL: https://github.com/apache/kafka/pull/10131#discussion_r602917794



##
File path: build.gradle
##
@@ -1423,18 +1423,8 @@ project(':streams') {
   dependencies {
 api project(':clients')
 
-// use `api` dependency for `connect-json` for compatibility (e.g. users 
who use `JsonSerializer`/`JsonDeserializer`
-// at compile-time without an explicit dependency on `connect-json`)
-// this dependency should be removed after we unify data API
-api(project(':connect:json')) {
-  // this transitive dependency is not used in Streams, and it breaks SBT 
builds
-  exclude module: 'javax.ws.rs-api'
-}
-
-// `org.rocksdb.Options` is part of Kafka Streams public api via 
`RocksDBConfigSetter`
-api libs.rocksDBJni
-
 implementation libs.slf4jApi
+implementation libs.rocksDBJni

Review comment:
   This seems wrong, is it unintentional?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-06 Thread GitBox


hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r608009360



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.Objects;
+
+public class CoordinatorKey {

Review comment:
   Hmm.. I think it is a little clearer to use a separate type. If we _do_ 
add batching to FindCoordinator, then we'd just have to bring it back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji edited a comment on pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

2021-04-06 Thread GitBox


hachikuji edited a comment on pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#issuecomment-813766742


   @chia7712 @dajac No rush, but when you have time, this is a continuation of 
the previous work which added `AdminApiDriver`. This patch contains 
`CoordinatorStrategy`, which is needed to lookup group/transaction coordinators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12457) Implications of KIP-516 for quorum controller

2021-04-06 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-12457:
--

Assignee: Justine Olshan

> Implications of KIP-516 for quorum controller
> -
>
> Key: KAFKA-12457
> URL: https://issues.apache.org/jira/browse/KAFKA-12457
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Justine Olshan
>Priority: Major
>  Labels: kip-500
>
> KIP-516 introduces topic IDs to Kafka. We are in the process of updating many 
> of the protocols to support them. In most cases, we are dropping the topic 
> name entirely from new API versions. I think there are two open questions for 
> KIP-500 in regard to this:
> 1. Can we assume topic ID existence in KIP-500? 
> I think the answer here is yes, and the existing code already assumes it. The 
> nice thing is that KIP-516 brings with it the logic to create topic IDs for 
> existing topics. We can rely on this ability in the bridge release to ensure 
> that all topics have topic IDs. And we can add it to pre-upgrade validations.
> 2. What topic ID should be used for `@metadata`? 
> There are basically two options for this: either use a sentinel topic ID or 
> let the controller generate a new one and write it into a `TopicRecord` when 
> the cluster first initializes. If we assume long term that we won't be able 
> to use topic names in the inter-broker protocol, then a sentinel might really 
> be a necessity since brokers would need to know the topic ID before they can 
> send fetches. In other words, if we generate a unique ID, then we probably 
> still need some sentinel so that followers can fetch the initial 
> `TopicRecord` which contains the ID.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-04-06 Thread GitBox


hachikuji commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r608014866



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/SerdeRecordsIterator.java
##
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.BatchReader.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SerdeRecordsIterator implements Iterator>, 
AutoCloseable {
+private final Records records;
+private final RecordSerde serde;
+private final BufferSupplier bufferSupplier;
+private final int maxBatchSize;
+
+private Optional> nextBatches = 
Optional.empty();
+private Optional> nextBatch = Optional.empty();
+// Buffer used to as the backing store for nextBatches if needed
+private Optional allocatedBuffer = Optional.empty();
+// Number of bytes from records that read
+private int bytesRead = 0;
+private boolean isClosed = false;
+
+public SerdeRecordsIterator(
+Records records,
+RecordSerde serde,
+BufferSupplier bufferSupplier,
+int maxBatchSize
+) {
+this.records = records;
+this.serde = serde;
+this.bufferSupplier = bufferSupplier;
+this.maxBatchSize = maxBatchSize;
+}
+
+@Override
+public boolean hasNext() {
+checkIfClosed();
+
+if (!nextBatch.isPresent()) {
+nextBatch = nextBatch();
+}
+
+return nextBatch.isPresent();
+}
+
+@Override
+public Batch next() {
+if (!hasNext()) {
+throw new NoSuchElementException("Batch iterator doesn't have any 
more elements");
+}
+
+Batch batch = nextBatch.get();
+nextBatch = Optional.empty();
+
+return batch;
+}
+
+@Override
+public void close() {
+isClosed = true;
+allocatedBuffer.ifPresent(bufferSupplier::release);
+allocatedBuffer = Optional.empty();
+}
+
+private void checkIfClosed() {
+if (isClosed) {
+throw new IllegalStateException("Serde record batch itererator was 
closed");
+}
+}
+
+private Optional> nextBatches() {
+int recordSize = records.sizeInBytes();
+if (bytesRead < recordSize) {
+final MemoryRecords memoryRecords;
+if (records instanceof MemoryRecords) {
+bytesRead = recordSize;
+memoryRecords = (MemoryRecords) records;
+} else if (records instanceof FileRecords) {
+final ByteBuffer buffer;
+if (allocatedBuffer.isPresent()) {
+buffer = allocatedBuffer.get();
+buffer.compact();
+
+if (!buffer.hasRemaining()) {
+// The buffer is not big enough to read an entire batch
+throw new IllegalStateException(
+String.format(
+"Unable to read batch from file records buffer 
%s with maximum batch %s and record size %s",
+buffer,
+maxBatchSize,
+records.sizeInBytes()
+)
+);
+}

Review comment:
   Can we just let the buffer grow as needed? We have a `BufferSupplier` 
after all.




-- 
This is an automated message from the Apache Git Service.
To respond to the m

[GitHub] [kafka] cmccabe merged pull request #10455: MINOR: Support ExponentialBackoff without jitter

2021-04-06 Thread GitBox


cmccabe merged pull request #10455:
URL: https://github.com/apache/kafka/pull/10455


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-06 Thread GitBox


satishd commented on pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#issuecomment-814275837


   @junrao I renamed `remote-storage` module to `storage` module as you 
suggested in the commit 
[009d7fa](https://github.com/apache/kafka/pull/10218/commits/009d7fa6c861c671f3b2aa85c05fdd5ff467e195).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MarcoLotz commented on a change in pull request #10131: KAFKA-5146 / move kafka-streams example to a new module

2021-04-06 Thread GitBox


MarcoLotz commented on a change in pull request #10131:
URL: https://github.com/apache/kafka/pull/10131#discussion_r608024597



##
File path: docs/upgrade.html
##
@@ -37,6 +37,8 @@ Notable changes in 3
 The deprecated kafka-preferred-replica-election command 
line tool was removed. Please use kafka-leader-election 
instead.
 The deprecated ConfigEntry constructor was removed (https://issues.apache.org/jira/browse/KAFKA-12577";>KAFKA-12577).
 Please use the remaining public constructor instead.
+Kafka Streams no longer has a compile time dependency on 
"connect:json" module (https://issues.apache.org/jira/browse/KAFKA-5146";>KAFKA-5146).

Review comment:
   In docs/upgrade.html they all reference a Kafka jira ticket with the 
hyperlink. In docs/streams/upgrade-guide.html they all reference a KIP, but 
there were previous upgrades referencing a jira ticket with the hyperlink. 
Seems consistent to keep hyperlink in both files.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MarcoLotz commented on a change in pull request #10131: KAFKA-5146 / move kafka-streams example to a new module

2021-04-06 Thread GitBox


MarcoLotz commented on a change in pull request #10131:
URL: https://github.com/apache/kafka/pull/10131#discussion_r608024857



##
File path: build.gradle
##
@@ -1423,18 +1423,8 @@ project(':streams') {
   dependencies {
 api project(':clients')
 
-// use `api` dependency for `connect-json` for compatibility (e.g. users 
who use `JsonSerializer`/`JsonDeserializer`
-// at compile-time without an explicit dependency on `connect-json`)
-// this dependency should be removed after we unify data API
-api(project(':connect:json')) {
-  // this transitive dependency is not used in Streams, and it breaks SBT 
builds
-  exclude module: 'javax.ws.rs-api'
-}
-
-// `org.rocksdb.Options` is part of Kafka Streams public api via 
`RocksDBConfigSetter`
-api libs.rocksDBJni
-
 implementation libs.slf4jApi
+implementation libs.rocksDBJni

Review comment:
   Indeed unintentional. fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-06 Thread GitBox


kowshik commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r607959239



##
File path: clients/src/test/java/org/apache/kafka/test/TestUtils.java
##
@@ -535,4 +536,48 @@ public static void setFieldValue(Object obj, String 
fieldName, Object value) thr
 field.setAccessible(true);
 field.set(obj, value);
 }
+
+/**
+ * Returns true if both iterators have same elements in the same order.
+ *
+ * @param iterator1 first iterator.
+ * @param iterator2 second iterator.
+ * @paramtype of element in the iterators.
+ * @return

Review comment:
   nit: remove empty `@return`

##
File path: clients/src/test/java/org/apache/kafka/test/TestUtils.java
##
@@ -535,4 +536,48 @@ public static void setFieldValue(Object obj, String 
fieldName, Object value) thr
 field.setAccessible(true);
 field.set(obj, value);
 }
+
+/**
+ * Returns true if both iterators have same elements in the same order.
+ *
+ * @param iterator1 first iterator.
+ * @param iterator2 second iterator.
+ * @paramtype of element in the iterators.
+ * @return
+ */
+public static  boolean sameElementsWithOrder(Iterator iterator1,
+Iterator iterator2) {
+while (iterator1.hasNext()) {
+if (!iterator2.hasNext()) {
+return false;
+}
+
+Object elem1 = iterator1.next();
+Object elem2 = iterator2.next();
+if (!Objects.equals(elem1, elem2)) {
+return false;
+}
+}
+
+return !iterator2.hasNext();
+}
+
+/**
+ * Returns true if both the iterators have same set of elements 
irrespective of order and duplicates.
+ *
+ * @param iterator1 first iterator.
+ * @param iterator2 second iterator.
+ * @paramtype of element in the iterators.
+ * @return

Review comment:
   nit: remove empty `@return`

##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
##
@@ -87,4 +89,27 @@ public byte id() {
 public static RemoteLogSegmentState forId(byte id) {
 return STATE_TYPES.get(id);
 }
+
+public static boolean isValidTransition(RemoteLogSegmentState srcState, 
RemoteLogSegmentState targetState) {
+Objects.requireNonNull(targetState, "targetState can not be null");
+
+if (srcState == null) {

Review comment:
   Same comment as before: 
https://github.com/apache/kafka/pull/10218/files#r598982742.
   Can srcState be null in practice? If not, this can be defined as an instance 
method.

##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ *

[GitHub] [kafka] MarcoLotz commented on pull request #10131: KAFKA-5146 / move kafka-streams example to a new module

2021-04-06 Thread GitBox


MarcoLotz commented on pull request #10131:
URL: https://github.com/apache/kafka/pull/10131#issuecomment-814281424


   > Sorry the delay -- and thanks for updating the docs! Overall LGTM.
   > 
   > Seems there is some conflict -- can you rebase so we can merge this PR?

Sure thing @mjsax ! Fixed merge conflicts and extra comment from @ijuma   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #10482: KAFKA-12499: add transaction timeout verification

2021-04-06 Thread GitBox


abbccdda commented on a change in pull request #10482:
URL: https://github.com/apache/kafka/pull/10482#discussion_r608033548



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -106,7 +107,7 @@ public StreamsProducer(final StreamsConfig config,
 producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
 
 eosBetaProducerConfigs = null;
-
+verifyTransactionTimeoutCompatibility(producerConfigs, config);

Review comment:
   Sounds good!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10366: KAFKA-12467: Add controller-side snapshot generation

2021-04-06 Thread GitBox


cmccabe merged pull request #10366:
URL: https://github.com/apache/kafka/pull/10366


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #9826: KAFKA-10816: Initialize REST endpoints only after the herder has started

2021-04-06 Thread GitBox


C0urante commented on pull request #9826:
URL: https://github.com/apache/kafka/pull/9826#issuecomment-814294461


   @tombentley any plans to revisit this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10389: KAFKA-12384: stabilize ListOffsetsRequestTest#testResponseIncludesLeaderEpoch

2021-04-06 Thread GitBox


chia7712 commented on pull request #10389:
URL: https://github.com/apache/kafka/pull/10389#issuecomment-814296573


   >  Is the test still flaky with the latest changes?
   
   yep. I loop this patch 100 times and all pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12457) Implications of KIP-516 for quorum controller

2021-04-06 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315739#comment-17315739
 ] 

Justine Olshan commented on KAFKA-12457:


I'm going to start with the sentinel ID for now. I'll be using  Uuid(0L, 1L) as 
designated in the Uuid class.

> Implications of KIP-516 for quorum controller
> -
>
> Key: KAFKA-12457
> URL: https://issues.apache.org/jira/browse/KAFKA-12457
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Justine Olshan
>Priority: Major
>  Labels: kip-500
>
> KIP-516 introduces topic IDs to Kafka. We are in the process of updating many 
> of the protocols to support them. In most cases, we are dropping the topic 
> name entirely from new API versions. I think there are two open questions for 
> KIP-500 in regard to this:
> 1. Can we assume topic ID existence in KIP-500? 
> I think the answer here is yes, and the existing code already assumes it. The 
> nice thing is that KIP-516 brings with it the logic to create topic IDs for 
> existing topics. We can rely on this ability in the bridge release to ensure 
> that all topics have topic IDs. And we can add it to pre-upgrade validations.
> 2. What topic ID should be used for `@metadata`? 
> There are basically two options for this: either use a sentinel topic ID or 
> let the controller generate a new one and write it into a `TopicRecord` when 
> the cluster first initializes. If we assume long term that we won't be able 
> to use topic names in the inter-broker protocol, then a sentinel might really 
> be a necessity since brokers would need to know the topic ID before they can 
> send fetches. In other words, if we generate a unique ID, then we probably 
> still need some sentinel so that followers can fetch the initial 
> `TopicRecord` which contains the ID.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd opened a new pull request #10489: MINOR Moved tiered storage API classes from clients module to a new storage-api module.

2021-04-06 Thread GitBox


satishd opened a new pull request #10489:
URL: https://github.com/apache/kafka/pull/10489


   MINOR Moved tiered storage API classes from "clients" module to a new 
"storage-api" module.
Created storage and storage-api modules
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10489: MINOR Moved tiered storage API classes from clients module to a new storage-api module.

2021-04-06 Thread GitBox


satishd commented on pull request #10489:
URL: https://github.com/apache/kafka/pull/10489#issuecomment-814306289


   @junrao Raised this PR as you suggested earlier about moving client classes 
to a separate module. I created `storage-api` submodule and moved all the 
remote storage API classes into that module. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10465: MINOR: add unit test for ControllerApis#createTopics

2021-04-06 Thread GitBox


cmccabe commented on a change in pull request #10465:
URL: https://github.com/apache/kafka/pull/10465#discussion_r608065053



##
File path: core/src/test/java/kafka/test/MockController.java
##
@@ -77,8 +82,28 @@ private MockController(Collection initialTopics) {
 }
 
 @Override
-public CompletableFuture 
createTopics(CreateTopicsRequestData request) {
-throw new UnsupportedOperationException();
+synchronized public CompletableFuture
+createTopics(CreateTopicsRequestData request) {
+CreateTopicsResponseData response = new CreateTopicsResponseData();
+for (CreatableTopic topic : request.topics()) {
+if (topics.containsKey(topic.name())) {
+response.topics().add(new CreatableTopicResult().
+setName(topic.name()).
+setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()));
+} else {
+long topicId = nextTopicId.getAndAdd(1);
+Uuid topicUuid = new Uuid(0, topicId);
+topicNameToId.put(topic.name(), topicUuid);
+topics.put(topicUuid, new MockTopic(topic.name(), topicUuid));
+response.topics().add(new CreatableTopicResult().
+setName(topic.name()).
+setErrorCode(Errors.NONE.code()).
+setTopicId(topicUuid));
+// For a better mock, we might want to return configs, 
replication

Review comment:
   It wouldn't be useful for the test in this PR since we're trying to test 
the logic in `ControllerApis.scala`, which doesn't interact with configs or the 
replication factor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10465: MINOR: add unit test for ControllerApis#createTopics

2021-04-06 Thread GitBox


cmccabe commented on a change in pull request #10465:
URL: https://github.com/apache/kafka/pull/10465#discussion_r608065815



##
File path: core/src/test/java/kafka/test/MockController.java
##
@@ -77,8 +82,28 @@ private MockController(Collection initialTopics) {
 }
 
 @Override
-public CompletableFuture 
createTopics(CreateTopicsRequestData request) {
-throw new UnsupportedOperationException();
+synchronized public CompletableFuture
+createTopics(CreateTopicsRequestData request) {
+CreateTopicsResponseData response = new CreateTopicsResponseData();
+for (CreatableTopic topic : request.topics()) {
+if (topics.containsKey(topic.name())) {

Review comment:
   Good catch.  I'll fix this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10465: MINOR: add unit test for ControllerApis#createTopics

2021-04-06 Thread GitBox


cmccabe commented on a change in pull request #10465:
URL: https://github.com/apache/kafka/pull/10465#discussion_r608067488



##
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##
@@ -32,10 +32,13 @@ import org.apache.kafka.common.Uuid.ZERO_UUID
 import org.apache.kafka.common.errors.{InvalidRequestException, 
NotControllerException, TopicDeletionDisabledException}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicCollection}
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
 import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
 import 
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
-import org.apache.kafka.common.message.{BrokerRegistrationRequestData, 
DeleteTopicsRequestData}
+import org.apache.kafka.common.message.{BrokerRegistrationRequestData, 
CreateTopicsRequestData, DeleteTopicsRequestData}
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE, 
TOPIC_AUTHORIZATION_FAILED}

Review comment:
   Good point.  I will add imports for all the Errors to make the code more 
readable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

2021-04-06 Thread GitBox


spena commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r606336280



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+public class RocksDBTimeOrderedSegmentedBytesStore extends 
AbstractRocksDBSegmentedBytesStore {
+private final KeySchema keySchema;
+private final AbstractSegments segments;
+
+RocksDBTimeOrderedSegmentedBytesStore(final String name,

Review comment:
   I'll remove this class. I added because I was going to overwrite the 
`fetchAll` to use prefixes to fetch data. Now that we don't need to do that, 
then this class is useless. I will re-use the `RocksDBSegmentedBytesStore` and 
pass the `TimeOrderedKeySchema` to it.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A persistent (time-key)-value store based on RocksDB.
+ */
+public class RocksDBTimeOrderedWindowStore

Review comment:
   Seems too much work on the testing side if I keep most of the methods 
unsupported. Btw, I will remove the `RocksDBTimeOrderedSegmentedBytesStore` 
because we don't need to do range queries with prefixes anymore. With that, all 
methods in this class will work as expected without the issues with the range 
iterators caused by looking with prefixed keys. I think we should keep all the 
functionality. It is harmless, and it is already tested by 
`RocksDBTimeOrderedWindowStoreTest`. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthurne opened a new pull request #10490: MINOR: Gradle build optimizations

2021-04-06 Thread GitBox


jthurne opened a new pull request #10490:
URL: https://github.com/apache/kafka/pull/10490


   Applies various build optimizations to take full advantage of Gradle's 
incremental build feature, and Gradle's local build caching feature. The 
optimizations will help to reduce build times for developers when running 
builds locally.
   
   Specific optimizations:
   
 - **Enable the local build cache**
The local build cache avoids re-running tasks whose inputs have not 
changed,
   even if the build output has been cleaned or if the build is executed in a
   different directory.
   
 - **Instruct Gradle to ignore version files when doing up-to-date checks**
This can save a lot of developer time when working locally and creating 
a lot of commits. In particular, tests won't be re-executed simply because the 
commit ID changes (unless one of the other test inputs changes).
   
 - **Update createVersionFile tasks support incremental builds and build 
caching**
The version files are no longer recreated unless one of their inputs 
changes (the commit ID or the version property). The tasks are also marked as 
cacheable (the cached version can be used even on a `clean` build if the inputs 
are the same).

These tasks don't take a lot of time to run, but optimizing them shows 
how other tasks could be optimized if needed.
   
 - **Set the root project name in settings.gradle**
The root project name is used to name the build as a whole. It is used 
in reports and other types of artifacts. If a root project name is not set, 
then the containing directory's name is used as the root project name.
   
It is a best-practice to explicitly set the root project name. 
Otherwise, the root project name could change if the directory the project 
belongs to is different from the desired name (for example, of someone checks 
out the repository into a directory named differently from the default).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10485: MINOR: Enable scala/java joint compilation consistently for `core` module

2021-04-06 Thread GitBox


ijuma merged pull request #10485:
URL: https://github.com/apache/kafka/pull/10485


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthurne commented on pull request #10490: MINOR: Gradle build optimizations

2021-04-06 Thread GitBox


jthurne commented on pull request #10490:
URL: https://github.com/apache/kafka/pull/10490#issuecomment-814374883






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthurne commented on pull request #10490: MINOR: Gradle build optimizations

2021-04-06 Thread GitBox


jthurne commented on pull request #10490:
URL: https://github.com/apache/kafka/pull/10490#issuecomment-814375110


   One last comment: these changes should be compatible with Gradle 7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthurne edited a comment on pull request #10490: MINOR: Gradle build optimizations

2021-04-06 Thread GitBox


jthurne edited a comment on pull request #10490:
URL: https://github.com/apache/kafka/pull/10490#issuecomment-814375110


   One last comment: these changes should are compatible with Gradle 7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthurne edited a comment on pull request #10490: MINOR: Gradle build optimizations

2021-04-06 Thread GitBox


jthurne edited a comment on pull request #10490:
URL: https://github.com/apache/kafka/pull/10490#issuecomment-814375110


   These changes are compatible with Gradle 7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthurne opened a new pull request #10491: MINOR: Switch to using the Gradle RAT plugin

2021-04-06 Thread GitBox


jthurne opened a new pull request #10491:
URL: https://github.com/apache/kafka/pull/10491


   The Gradle RAT plugin properly declares inputs and outputs and is also
   cachable. This also relieves the Kafka developers from maintaining the build
   integration with RAT.
   
   The generated RAT report is identical to the one generated previously. The 
only
   difference is the RAT report name: the RAT plugin sets the HTML report name 
to
   `index.html` (still under `build/rat`).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthurne commented on pull request #10491: MINOR: Switch to using the Gradle RAT plugin

2021-04-06 Thread GitBox


jthurne commented on pull request #10491:
URL: https://github.com/apache/kafka/pull/10491#issuecomment-814377383


   @ijuma another one for you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthurne commented on pull request #10491: MINOR: Switch to using the Gradle RAT plugin

2021-04-06 Thread GitBox


jthurne commented on pull request #10491:
URL: https://github.com/apache/kafka/pull/10491#issuecomment-814378302


   Full disclosure: I'm an engineer that works for Gradle (on [Gradle 
Enterprise](https://gradle.com/), and I found these optimizations while using 
the project to test out Gradle Enterprise features.
   
   I put this change in a separate PR from [the other build 
optimizations](https://github.com/apache/kafka/pull/10490) since it is a 
slightly larger change, and since the HTML report name changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12620) Producer IDs generated by the controller

2021-04-06 Thread David Arthur (Jira)
David Arthur created KAFKA-12620:


 Summary: Producer IDs generated by the controller
 Key: KAFKA-12620
 URL: https://issues.apache.org/jira/browse/KAFKA-12620
 Project: Kafka
  Issue Type: New Feature
Reporter: David Arthur
Assignee: David Arthur


This is to track the implementation of 
[KIP-730|https://cwiki.apache.org/confluence/display/KAFKA/KIP-730%3A+Producer+ID+generation+in+KRaft+mode]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12620) Producer IDs generated by the controller

2021-04-06 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-12620:
-
Issue Type: Improvement  (was: New Feature)

> Producer IDs generated by the controller
> 
>
> Key: KAFKA-12620
> URL: https://issues.apache.org/jira/browse/KAFKA-12620
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>
> This is to track the implementation of 
> [KIP-730|https://cwiki.apache.org/confluence/display/KAFKA/KIP-730%3A+Producer+ID+generation+in+KRaft+mode]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] spena commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

2021-04-06 Thread GitBox


spena commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r608133301



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##
@@ -1173,14 +1173,23 @@ private void putSecondBatch(final WindowStore store,
 store.put(2, "two+6");
 }
 
+long extractStoreTimestamp(final byte[] binaryKey) {

Review comment:
   Added tests cases




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-06 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r608140887



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -395,31 +393,35 @@ private void resetConnections() {
 requestManager.resetAll();
 }
 
-private void onBecomeLeader(long currentTimeMs) {
-LeaderState state = quorum.leaderStateOrThrow();
+private void onBecomeLeader(long currentTimeMs) throws IOException {
+long endOffset = log.endOffset().offset;
+
+// Add 1 to the offset that the accumulator tracks since 
appendLeaderChangeMessage 
+// will write a record from the new leader's epoch to advance the high 
watermark below
+BatchAccumulator accumulator = new BatchAccumulator<>(
+quorum.epoch(),
+endOffset + 1,

Review comment:
   Hmm.. This is a little strange. It would be nice if we could refactor 
this so that we can write the LeaderChange message through LeaderState. This is 
a little bit tricky, but it should be doable. Let me try to put this together.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #10492: KAFKA-12457: Added sentinel ID to metadata topic

2021-04-06 Thread GitBox


jolshan opened a new pull request #10492:
URL: https://github.com/apache/kafka/pull/10492


   KIP-516 introduces topic IDs to topics, but there is a small issue with how 
the metadata topic will interact with topic IDs. 
   
   For example, https://github.com/apache/kafka/pull/9944 aims to replace topic 
names in the Fetch request with topic IDs. In order to get these IDs, brokers 
must fetch from the metadata topic. This leads to a sort of "chicken and the 
egg" problem concerning how we find out the metadata topic's topic ID. 
   
   One solution proposed in KIP-516 was to introduce a "sentinel ID" for the 
metadata topic. This is a reserved ID for the metadata topic only.  This PR 
adds the sentinel ID when creating the metadata log.
   More information can be found in the 
[JIRA](https://issues.apache.org/jira/browse/KAFKA-12457) and in 
[KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #10474: KAFKA-12602: Fix LICENSE file

2021-04-06 Thread GitBox


vvcephei merged pull request #10474:
URL: https://github.com/apache/kafka/pull/10474


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-04-06 Thread GitBox


jolshan commented on pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#issuecomment-814408153


   currently blocked on https://github.com/apache/kafka/pull/10492 
   (Need to add topic IDs to the metadata topic for fetching)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10492: KAFKA-12457: Add sentinel ID to metadata topic

2021-04-06 Thread GitBox


jolshan commented on pull request #10492:
URL: https://github.com/apache/kafka/pull/10492#issuecomment-814417281


   @hachikuji we defined it and have a check, but it is internal. I can add a 
public constant.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12621) Kafka setup with Zookeeper specifying an alternate znode as root fails

2021-04-06 Thread Jibitesh Prasad (Jira)
Jibitesh Prasad created KAFKA-12621:
---

 Summary: Kafka setup with Zookeeper specifying an alternate znode 
as root fails
 Key: KAFKA-12621
 URL: https://issues.apache.org/jira/browse/KAFKA-12621
 Project: Kafka
  Issue Type: Bug
  Components: config
Affects Versions: 2.6.1
 Environment: Linux
OS: 16.04.1-Ubuntu SMP 
Architecture: x86_64
Kernel Version: 4.15.0-1108-azure
Reporter: Jibitesh Prasad


While configuring kafka with an znode apart from "/", the configuration is 
created in the wrong znode. Fo example, I have the following entry in my 
server.properties

_zookeeper.connect=10.114.103.207:2181/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_

The IPs are the IP addresses of the nodes of zookeeper cluster. I expect the 
kafka server to use _kafka_secondary_cluster_ as the znode in the zookeeper 
nodes. But, the znode which is created is actually

_/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_

Executing ls on the above path shows me the necessary znodes being created in 
that path _[zk: localhost:2181(CONNECTED) 1] ls 
/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_

Output:
_[admin, brokers, cluster, config, consumers, controller, controller_epoch, 
isr_change_notification, latest_producer_id_block, log_dir_event_notification]_

Shouldn't these configurations be created in _/kafka_secondary_cluster_. It 
seems the comma separated values are not being split correctly. Or am I doing 
something wrong?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12621) Kafka setup with Zookeeper specifying an alternate znode as root fails

2021-04-06 Thread Jibitesh Prasad (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jibitesh Prasad updated KAFKA-12621:

Description: 
While configuring kafka with an znode apart from "/", the configuration is 
created in the wrong znode. Fo example, I have the following entry in my 
server.properties

_zookeeper.connect=10.114.103.207:2181/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_

The IPs are the IP addresses of the nodes of zookeeper cluster. I expect the 
kafka server to use _kafka_secondary_cluster_ as the znode in the zookeeper 
nodes. But, the znode which is created is actually

_/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_

Executing ls on the above path shows me the necessary znodes being created in 
that path

_[zk: localhost:2181(CONNECTED) 1] ls 
/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_

Output:
 _[admin, brokers, cluster, config, consumers, controller, controller_epoch, 
isr_change_notification, latest_producer_id_block, log_dir_event_notification]_

Shouldn't these configurations be created in _/kafka_secondary_cluster_. It 
seems the comma separated values are not being split correctly. Or am I doing 
something wrong?

  was:
While configuring kafka with an znode apart from "/", the configuration is 
created in the wrong znode. Fo example, I have the following entry in my 
server.properties

_zookeeper.connect=10.114.103.207:2181/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_

The IPs are the IP addresses of the nodes of zookeeper cluster. I expect the 
kafka server to use _kafka_secondary_cluster_ as the znode in the zookeeper 
nodes. But, the znode which is created is actually

_/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_

Executing ls on the above path shows me the necessary znodes being created in 
that path _[zk: localhost:2181(CONNECTED) 1] ls 
/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_

Output:
_[admin, brokers, cluster, config, consumers, controller, controller_epoch, 
isr_change_notification, latest_producer_id_block, log_dir_event_notification]_

Shouldn't these configurations be created in _/kafka_secondary_cluster_. It 
seems the comma separated values are not being split correctly. Or am I doing 
something wrong?


> Kafka setup with Zookeeper specifying an alternate znode as root fails
> --
>
> Key: KAFKA-12621
> URL: https://issues.apache.org/jira/browse/KAFKA-12621
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 2.6.1
> Environment: Linux
> OS: 16.04.1-Ubuntu SMP 
> Architecture: x86_64
> Kernel Version: 4.15.0-1108-azure
>Reporter: Jibitesh Prasad
>Priority: Major
>  Labels: config, newbie, server.properties
>
> While configuring kafka with an znode apart from "/", the configuration is 
> created in the wrong znode. Fo example, I have the following entry in my 
> server.properties
> _zookeeper.connect=10.114.103.207:2181/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_
> The IPs are the IP addresses of the nodes of zookeeper cluster. I expect the 
> kafka server to use _kafka_secondary_cluster_ as the znode in the zookeeper 
> nodes. But, the znode which is created is actually
> _/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_
> Executing ls on the above path shows me the necessary znodes being created in 
> that path
> _[zk: localhost:2181(CONNECTED) 1] ls 
> /kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_
> Output:
>  _[admin, brokers, cluster, config, consumers, controller, controller_epoch, 
> isr_change_notification, latest_producer_id_block, 
> log_dir_event_notification]_
> Shouldn't these configurations be created in _/kafka_secondary_cluster_. It 
> seems the comma separated values are not being split correctly. Or am I doing 
> something wrong?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication

2021-04-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315831#comment-17315831
 ] 

Matthias J. Sax commented on KAFKA-9013:


Timeout issue (observed twice):


{quote} {{java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: The request timed out.
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:365)
at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:340)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:609)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:173)
at 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:63)}}{quote}

> Flaky Test MirrorConnectorsIntegrationTest#testReplication
> --
>
> Key: KAFKA-9013
> URL: https://issues.apache.org/jira/browse/KAFKA-9013
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {code:java}
> java.lang.AssertionError: Condition not met within timeout 2. Offsets not 
> translated downstream to primary cluster.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354)
>   at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239)
> {code}
> h1. Standard Error
> {code}
> Standard Error
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtim

[jira] [Updated] (KAFKA-12621) Kafka setup with Zookeeper- specifying an alternate znode creates the configuration at the wrong znode

2021-04-06 Thread Jibitesh Prasad (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jibitesh Prasad updated KAFKA-12621:

Summary: Kafka setup with Zookeeper- specifying an alternate znode creates 
the configuration at the wrong znode  (was: Kafka setup with Zookeeper 
specifying an alternate znode as root fails)

> Kafka setup with Zookeeper- specifying an alternate znode creates the 
> configuration at the wrong znode
> --
>
> Key: KAFKA-12621
> URL: https://issues.apache.org/jira/browse/KAFKA-12621
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 2.6.1
> Environment: Linux
> OS: 16.04.1-Ubuntu SMP 
> Architecture: x86_64
> Kernel Version: 4.15.0-1108-azure
>Reporter: Jibitesh Prasad
>Priority: Major
>  Labels: config, newbie, server.properties
>
> While configuring kafka with an znode apart from "/", the configuration is 
> created in the wrong znode. Fo example, I have the following entry in my 
> server.properties
> _zookeeper.connect=10.114.103.207:2181/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_
> The IPs are the IP addresses of the nodes of zookeeper cluster. I expect the 
> kafka server to use _kafka_secondary_cluster_ as the znode in the zookeeper 
> nodes. But, the znode which is created is actually
> _/kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_
> Executing ls on the above path shows me the necessary znodes being created in 
> that path
> _[zk: localhost:2181(CONNECTED) 1] ls 
> /kafka_secondary_cluster,10.114.103.206:2181/kafka_secondary_cluster,10.114.103.205:2181/kafka_secondary_cluster_
> Output:
>  _[admin, brokers, cluster, config, consumers, controller, controller_epoch, 
> isr_change_notification, latest_producer_id_block, 
> log_dir_event_notification]_
> Shouldn't these configurations be created in _/kafka_secondary_cluster_. It 
> seems the comma separated values are not being split correctly. Or am I doing 
> something wrong?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12622) Automate LICENCSE file validation

2021-04-06 Thread John Roesler (Jira)
John Roesler created KAFKA-12622:


 Summary: Automate LICENCSE file validation
 Key: KAFKA-12622
 URL: https://issues.apache.org/jira/browse/KAFKA-12622
 Project: Kafka
  Issue Type: Task
Reporter: John Roesler
 Fix For: 3.0.0, 2.8.1


In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed a 
correct license file for 2.8.0. This file will certainly become wrong again in 
later releases, so we need to write some kind of script to automate a check.

It crossed my mind to automate the generation of the file, but it seems to be 
an intractable problem, considering that each dependency may change licenses, 
may package license files, link to them from their poms, link to them from 
their repos, etc. I've also found multiple URLs listed with various delimiters, 
broken links that I have to chase down, etc.

Therefore, it seems like the solution to aim for is simply: list all the jars 
that we package, and print out a report of each jar that's extra or missing vs. 
the ones in our `LICENSE-binary` file.

Here's how I do this manually right now:
{code:java}
// build the binary artifacts
$ ./gradlewAll releaseTarGz

// unpack the binary artifact $ cd core/build/distributions/
$ tar xf kafka_2.13-X.Y.Z.tgz
$ cd xf kafka_2.13-X.Y.Z

// list the packaged jars 
// (you can ignore the jars for our own modules, like kafka, kafka-clients, 
etc.)
$ ls libs/

// cross check the jars with the packaged LICENSE
// make sure all dependencies are listed with the right versions
$ cat LICENSE

// also double check all the mentioned license files are present
$ ls licenses {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12623) Fix LICENSE in 2.7

2021-04-06 Thread John Roesler (Jira)
John Roesler created KAFKA-12623:


 Summary: Fix LICENSE in 2.7
 Key: KAFKA-12623
 URL: https://issues.apache.org/jira/browse/KAFKA-12623
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: Mickael Maison
 Fix For: 2.7.1


Just splitting this out as a sub-task.

I've fixed the parent ticket on trunk and 2.8.

You'll need to cherry-pick the fix from 2.8 (see 
[https://github.com/apache/kafka/pull/10474)]

Then, you can follow the manual verification steps I detailed here: 
https://issues.apache.org/jira/browse/KAFKA-12622



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12383) Get RaftClusterTest.java and other KIP-500 junit tests working

2021-04-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315832#comment-17315832
 ] 

Matthias J. Sax commented on KAFKA-12383:
-

Observed a timeout issue
{quote} {{java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: createTopics
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:92)}}{quote}

> Get RaftClusterTest.java and other KIP-500 junit tests working
> --
>
> Key: KAFKA-12383
> URL: https://issues.apache.org/jira/browse/KAFKA-12383
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0, 2.9
>Reporter: Colin McCabe
>Assignee: David Arthur
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12624) Fix LICENSE in 2.6

2021-04-06 Thread John Roesler (Jira)
John Roesler created KAFKA-12624:


 Summary: Fix LICENSE in 2.6
 Key: KAFKA-12624
 URL: https://issues.apache.org/jira/browse/KAFKA-12624
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: A. Sophie Blee-Goldman
 Fix For: 2.6.2


Just splitting this out as a sub-task.

I've fixed the parent ticket on trunk and 2.8.

You'll need to cherry-pick the fix from 2.8 (see 
[https://github.com/apache/kafka/pull/10474)]

Then, you can follow the manual verification steps I detailed here: 
https://issues.apache.org/jira/browse/KAFKA-12622



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan edited a comment on pull request #10492: KAFKA-12457: Add sentinel ID to metadata topic

2021-04-06 Thread GitBox


jolshan edited a comment on pull request #10492:
URL: https://github.com/apache/kafka/pull/10492#issuecomment-814417281


   @hachikuji we defined it and have a check, but it is internal/private. I can 
add a public constant.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12602:
-
Fix Version/s: (was: 2.6.2)
   (was: 2.7.1)

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-06 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315839#comment-17315839
 ] 

John Roesler commented on KAFKA-12602:
--

I created subtasks for 2.7 and 2.6.

Also created https://issues.apache.org/jira/browse/KAFKA-12622 to automate a 
check.

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12622) Automate LICENCSE file validation

2021-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12622:
-
Description: 
In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed a 
correct license file for 2.8.0. This file will certainly become wrong again in 
later releases, so we need to write some kind of script to automate a check.

It crossed my mind to automate the generation of the file, but it seems to be 
an intractable problem, considering that each dependency may change licenses, 
may package license files, link to them from their poms, link to them from 
their repos, etc. I've also found multiple URLs listed with various delimiters, 
broken links that I have to chase down, etc.

Therefore, it seems like the solution to aim for is simply: list all the jars 
that we package, and print out a report of each jar that's extra or missing vs. 
the ones in our `LICENSE-binary` file.

The check should be part of the release script at least, if not part of the 
regular build (so we keep it up to date as dependencies change).

 

Here's how I do this manually right now:
{code:java}
// build the binary artifacts
$ ./gradlewAll releaseTarGz

// unpack the binary artifact $ cd core/build/distributions/
$ tar xf kafka_2.13-X.Y.Z.tgz
$ cd xf kafka_2.13-X.Y.Z

// list the packaged jars 
// (you can ignore the jars for our own modules, like kafka, kafka-clients, 
etc.)
$ ls libs/

// cross check the jars with the packaged LICENSE
// make sure all dependencies are listed with the right versions
$ cat LICENSE

// also double check all the mentioned license files are present
$ ls licenses {code}

  was:
In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed a 
correct license file for 2.8.0. This file will certainly become wrong again in 
later releases, so we need to write some kind of script to automate a check.

It crossed my mind to automate the generation of the file, but it seems to be 
an intractable problem, considering that each dependency may change licenses, 
may package license files, link to them from their poms, link to them from 
their repos, etc. I've also found multiple URLs listed with various delimiters, 
broken links that I have to chase down, etc.

Therefore, it seems like the solution to aim for is simply: list all the jars 
that we package, and print out a report of each jar that's extra or missing vs. 
the ones in our `LICENSE-binary` file.

Here's how I do this manually right now:
{code:java}
// build the binary artifacts
$ ./gradlewAll releaseTarGz

// unpack the binary artifact $ cd core/build/distributions/
$ tar xf kafka_2.13-X.Y.Z.tgz
$ cd xf kafka_2.13-X.Y.Z

// list the packaged jars 
// (you can ignore the jars for our own modules, like kafka, kafka-clients, 
etc.)
$ ls libs/

// cross check the jars with the packaged LICENSE
// make sure all dependencies are listed with the right versions
$ cat LICENSE

// also double check all the mentioned license files are present
$ ls licenses {code}


> Automate LICENCSE file validation
> -
>
> Key: KAFKA-12622
> URL: https://issues.apache.org/jira/browse/KAFKA-12622
> Project: Kafka
>  Issue Type: Task
>Reporter: John Roesler
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed 
> a correct license file for 2.8.0. This file will certainly become wrong again 
> in later releases, so we need to write some kind of script to automate a 
> check.
> It crossed my mind to automate the generation of the file, but it seems to be 
> an intractable problem, considering that each dependency may change licenses, 
> may package license files, link to them from their poms, link to them from 
> their repos, etc. I've also found multiple URLs listed with various 
> delimiters, broken links that I have to chase down, etc.
> Therefore, it seems like the solution to aim for is simply: list all the jars 
> that we package, and print out a report of each jar that's extra or missing 
> vs. the ones in our `LICENSE-binary` file.
> The check should be part of the release script at least, if not part of the 
> regular build (so we keep it up to date as dependencies change).
>  
> Here's how I do this manually right now:
> {code:java}
> // build the binary artifacts
> $ ./gradlewAll releaseTarGz
> // unpack the binary artifact $ cd core/build/distributions/
> $ tar xf kafka_2.13-X.Y.Z.tgz
> $ cd xf kafka_2.13-X.Y.Z
> // list the packaged jars 
> // (you can ignore the jars for our own modules, like kafka, kafka-clients, 
> etc.)
> $ ls libs/
> // cross check the jars with the packaged LICENSE
> // make sure all dependencies are listed with the right versions
> $ cat LICENSE
> // also double check all the mentioned license files are present
> $ l

[jira] [Created] (KAFKA-12625) Fix the NOTICE file

2021-04-06 Thread John Roesler (Jira)
John Roesler created KAFKA-12625:


 Summary: Fix the NOTICE file
 Key: KAFKA-12625
 URL: https://issues.apache.org/jira/browse/KAFKA-12625
 Project: Kafka
  Issue Type: Task
Reporter: John Roesler
 Fix For: 3.0.0, 2.8.1


In https://issues.apache.org/jira/browse/KAFKA-12602, we fixed the license 
file, and in the comments, Justin noted that we really should fix the NOTICE 
file as well.

Basically, we need to look though each of the packaged dependencies and 
transmit each of their NOTICEs (for Apache2 deps) or otherwise, any copyright 
notices they assert.

It would be good to consider automating a check for this as well (see 
https://issues.apache.org/jira/browse/KAFKA-12622)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-06 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315841#comment-17315841
 ] 

John Roesler commented on KAFKA-12602:
--

Just filed https://issues.apache.org/jira/browse/KAFKA-12625 to fix the NOTICE 
file.

This is the last loose end, so I'll go ahead and close this ticket.

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12602) The LICENSE and NOTICE files don't list everything they should

2021-04-06 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-12602.
--
Resolution: Fixed

> The LICENSE and NOTICE files don't list everything they should
> --
>
> Key: KAFKA-12602
> URL: https://issues.apache.org/jira/browse/KAFKA-12602
> Project: Kafka
>  Issue Type: Bug
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.8.0
>
>
> [~jmclean] raised this on the mailing list: 
> [https://lists.apache.org/thread.html/r2df54c11c10d3d38443054998bc7dd92d34362641733c2fb7c579b50%40%3Cdev.kafka.apache.org%3E]
>  
> We need to make  the license file match what we are actually shipping in 
> source and binary distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on pull request #10492: KAFKA-12457: Add sentinel ID to metadata topic

2021-04-06 Thread GitBox


jolshan commented on pull request #10492:
URL: https://github.com/apache/kafka/pull/10492#issuecomment-814431608


   @hachikuji I wasn't sure if I should mention the metadata topic/KRaft in the 
javadoc. I did for now, but let me know if I should change it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #10493: MINOR: cleanup Jenkins workspace before build

2021-04-06 Thread GitBox


mjsax opened a new pull request #10493:
URL: https://github.com/apache/kafka/pull/10493


   Observed a failing build with
   ```
   Failed to execute goal 
org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on 
project standalone-pom: A Maven project already exists in the directory 
/home/jenkins/workspace/Kafka_kafka-pr_PR-10131/streams/quickstart/test-streams-archetype/streams.examples
 -> [Help 1]
   ```
   
   We should clean the workspace before we start the build to avoid this issue. 
\cc @ijuma 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12383) Get RaftClusterTest.java and other KIP-500 junit tests working

2021-04-06 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17315845#comment-17315845
 ] 

Ismael Juma commented on KAFKA-12383:
-

The PR for this Jira was merged, so I closed it. We should probably have a 
different one for the flakiness.

> Get RaftClusterTest.java and other KIP-500 junit tests working
> --
>
> Key: KAFKA-12383
> URL: https://issues.apache.org/jira/browse/KAFKA-12383
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0, 2.9
>Reporter: Colin McCabe
>Assignee: David Arthur
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10492: KAFKA-12457: Add sentinel ID to metadata topic

2021-04-06 Thread GitBox


hachikuji commented on a change in pull request #10492:
URL: https://github.com/apache/kafka/pull/10492#discussion_r608177108



##
File path: clients/src/main/java/org/apache/kafka/common/Uuid.java
##
@@ -27,6 +27,10 @@
  */
 public class Uuid implements Comparable {
 
+/**
+ * A UUID for the metadata topic in KRaft mode. Will never be returned by 
the randomUuid method.
+ */
+public static final Uuid SENTINEL_ID = new Uuid(0L, 1L);

Review comment:
   Could we be more explicit and call this `METADATA_TOPIC_ID` or something 
like that? Do we have other contexts where we use it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >