[jira] [Assigned] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation

2021-05-24 Thread Ivan Yurchenko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Yurchenko reassigned KAFKA-12430:
--

Assignee: (was: Ivan Yurchenko)

> emit.heartbeats.enabled = false should disable heartbeats topic creation
> 
>
> Key: KAFKA-12430
> URL: https://issues.apache.org/jira/browse/KAFKA-12430
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Ivan Yurchenko
>Priority: Minor
>
> Currently, MirrorMaker 2's {{MirrorHeartbeatConnector}} emits heartbeats or 
> not based on {{emit.heartbeats.enabled}} setting. However, {{heartbeats}} 
> topic is created unconditionally. It seems that the same setting should 
> really disable the topic creation as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation

2021-05-24 Thread Ivan Yurchenko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Yurchenko updated KAFKA-12430:
---
Description: Currently, whether MirrorMaker 2's 
{{MirrorHeartbeatConnector}} emits heartbeats or not is based on 
{{emit.heartbeats.enabled}} setting. However, {{heartbeats}} topic is created 
unconditionally. It seems that the same setting should really disable the topic 
creation as well.  (was: Currently, MirrorMaker 2's 
{{MirrorHeartbeatConnector}} emits heartbeats or not based on 
{{emit.heartbeats.enabled}} setting. However, {{heartbeats}} topic is created 
unconditionally. It seems that the same setting should really disable the topic 
creation as well.)

> emit.heartbeats.enabled = false should disable heartbeats topic creation
> 
>
> Key: KAFKA-12430
> URL: https://issues.apache.org/jira/browse/KAFKA-12430
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Ivan Yurchenko
>Priority: Minor
>
> Currently, whether MirrorMaker 2's {{MirrorHeartbeatConnector}} emits 
> heartbeats or not is based on {{emit.heartbeats.enabled}} setting. However, 
> {{heartbeats}} topic is created unconditionally. It seems that the same 
> setting should really disable the topic creation as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vahidhashemian commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-24 Thread GitBox


vahidhashemian commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r638437845



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition,
 }
 }
 
-private boolean canParticipateInReassignment(TopicPartition partition,
- Map> partition2AllPotentialConsumers) {
+private boolean canParticipateInReassignment(String topic,
+ Map> 
topic2AllPotentialConsumers) {
 // if a partition has two or more potential consumers it is subject to 
reassignment.

Review comment:
   Comment needs an update.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition,
 }
 }
 
-private boolean canParticipateInReassignment(TopicPartition partition,
- Map> partition2AllPotentialConsumers) {
+private boolean canParticipateInReassignment(String topic,
+ Map> 
topic2AllPotentialConsumers) {
 // if a partition has two or more potential consumers it is subject to 
reassignment.
-return partition2AllPotentialConsumers.get(partition).size() >= 2;
+return topic2AllPotentialConsumers.get(topic).size() >= 2;
 }
 
 private boolean canParticipateInReassignment(String consumer,
  Map> currentAssignment,
- Map> consumer2AllPotentialPartitions,
- Map> partition2AllPotentialConsumers) {
+ Map> 
consumer2AllPotentialTopics,
+ Map> 
topic2AllPotentialConsumers,
+ Map 
partitionsPerTopic,
+ int totalPartitionCount) {
 List currentPartitions = 
currentAssignment.get(consumer);
 int currentAssignmentSize = currentPartitions.size();
-int maxAssignmentSize = 
consumer2AllPotentialPartitions.get(consumer).size();
+List allSubscribedTopics = 
consumer2AllPotentialTopics.get(consumer);
+int maxAssignmentSize;
+if (allSubscribedTopics.size() == partitionsPerTopic.size()) {
+maxAssignmentSize = totalPartitionCount;
+} else {
+maxAssignmentSize = allSubscribedTopics.stream().map(topic -> 
partitionsPerTopic.get(topic)).reduce(0, Integer::sum);
+}

Review comment:
   The same code block appears in lines 638-644. Is it possible to somehow 
factor it out?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+balance(currentAssignment, prevAssignment, sortedAllPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionsCount);
+
+if (log.isDebugEnabled()) {
+log.debug("final assignment: {}", currentAssignment);
+}
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+ * This is used in generalAssign method
+ *
+ * We loop the sortedPartition, and compare the ith element in 
sortedAssignedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+ *
+ * @param sortedAllPartitions:  sorted all partitions
+ * @param sortedAssignedPartitions: sorted partitions, all are 
included in the sortedPartitions
+ * @param topic2AllPotentialConsumers:  topics mapped to all consumers 
that subscribed to it
+ * @return  partitions that aren't 

[GitHub] [kafka] KahnCheny commented on pull request #10746: MINOR: remove unneccessary public keyword from ProducerInterceptor/ConsumerInterceptor interface

2021-05-24 Thread GitBox


KahnCheny commented on pull request #10746:
URL: https://github.com/apache/kafka/pull/10746#issuecomment-847490202


   > @KahnCheny , thanks for the PR. LGTM! Could you also remove the `public` 
keyword in `ConsumerInterceptor` ? Thank you.
   
Of course.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests

2021-05-24 Thread GitBox


showuon commented on a change in pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#discussion_r638426531



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##
@@ -40,6 +40,18 @@
 
 private final Optional timeCurrentIdlingStarted;
 
+/**
+ * @deprecated since 3.0, please use the constructor that accepts a TaskId 
object instead of a String

Review comment:
   nit: It should be more clear to put a constructor link here. ex:
   ```
   @deprecated since 3.0, please use {@link #TaskMetadata(TaskId, Set, Map, 
Map, Optional)} instead
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-05-24 Thread GitBox


showuon commented on a change in pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#discussion_r638422416



##
File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
##
@@ -91,14 +92,22 @@ private QuorumStateData readStateFromFile(File file) throws 
IOException {
 
 final short dataVersion = dataVersionNode.shortValue();
 return QuorumStateDataJsonConverter.read(dataObject, dataVersion);
+} catch (IOException e) {
+throw new UncheckedIOException(
+String.format(
+"Read the Quorum status exception from the file 
%s",
+file
+),
+e

Review comment:
   Also, could you re-phrase the error message, ex: `Error while reading 
the Quorum status from the file`
   
   Same comments to the other places.

##
File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
##
@@ -91,14 +92,22 @@ private QuorumStateData readStateFromFile(File file) throws 
IOException {
 
 final short dataVersion = dataVersionNode.shortValue();
 return QuorumStateDataJsonConverter.read(dataObject, dataVersion);
+} catch (IOException e) {
+throw new UncheckedIOException(
+String.format(
+"Read the Quorum status exception from the file 
%s",
+file
+),
+e

Review comment:
   The indent is not consistent with others. Also, should we break the 
exception into 6 lines? I think this could be enough:
   ```java
   throw new UncheckedIOException(
   String.format("Read the Quorum status exception from the file 
%s", file), e);
   ```
   
   Same comments to other places.

##
File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
##
@@ -67,7 +68,7 @@ public FileBasedStateStore(final File stateFile) {
 this.stateFile = stateFile;
 }
 
-private QuorumStateData readStateFromFile(File file) throws IOException {
+private QuorumStateData readStateFromFile(File file) {

Review comment:
   Should we replace `throws IOException` with `throws 
UncheckedIOException` here?
   
   Same comments to other places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests

2021-05-24 Thread GitBox


ableegoldman commented on pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#issuecomment-847495713


   All tests passed except for unrelated flaky 
`kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testRemovingWorker()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable

2021-05-24 Thread Pedro Gontijo (Jira)
Pedro Gontijo created KAFKA-12845:
-

 Summary: Rollback change which requires join key to be non null on 
KStream->GlobalKTable
 Key: KAFKA-12845
 URL: https://issues.apache.org/jira/browse/KAFKA-12845
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.7.0
Reporter: Pedro Gontijo


As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] the 
behavior for KStream->GlobalKtable joins was changed to require non null join 
keys.

But it seems reasonable that not every record will have an existing 
relationship (and hence a key) with the join globalktable. Think about a 
User>Car for instance, or PageView>Product. An empty/zero key could be returned 
by the KeyMapper but that will make a totally unnecessary search into the store.

I do not think that makes sense for any GlobalKtable join (inner or left) but 
for left join it sounds even more strange.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure

2021-05-24 Thread GitBox


ableegoldman commented on a change in pull request #10609:
URL: https://github.com/apache/kafka/pull/10609#discussion_r638396898



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -387,18 +422,18 @@ public synchronized void cleanRemovedTasks(final long 
cleanupDelayMs) {
 }
 
 private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs) {
-for (final File taskDir : listNonEmptyTaskDirectories()) {

Review comment:
   Just want to call this out since it's a change in behavior unrelated to 
this PR -- actually just something we could/should have cleaned up after 
removing the lock/file based locking. Previously we couldn't ever delete empty 
task dirs by cleaner thread (due to that Windows bug), now we can, so we should 
not exclude empty dirs here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure

2021-05-24 Thread GitBox


ableegoldman edited a comment on pull request #10609:
URL: https://github.com/apache/kafka/pull/10609#issuecomment-845638871


   Rebased after the TaskId changes in KIP-470, and responded to all comments. 
Not much has changed since the last review, just cleaning up here and there. 
~It's pretty much done except for StateDirectoryTest, which I can always do in 
a quick followup PR to unblock other downstream work with this~ edit: tests are 
done, this PR is fully ready for review and merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sasukerui closed pull request #10751: MINOR: update java doc for ConsumerCoordinator

2021-05-24 Thread GitBox


sasukerui closed pull request #10751:
URL: https://github.com/apache/kafka/pull/10751


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests

2021-05-24 Thread GitBox


ableegoldman commented on pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#issuecomment-847442114


   cc @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-24 Thread GitBox


ableegoldman merged pull request #10690:
URL: https://github.com/apache/kafka/pull/10690


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12835) Topic IDs can mismatch on brokers (after interbroker protocol version update)

2021-05-24 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350684#comment-17350684
 ] 

Justine Olshan commented on KAFKA-12835:


Hi [~ivanyu]. Previously we could lose topic IDs in the Znode when reassigning 
partitions. This could occur if we switched between controllers with IBP 2.8 
back to 2.7 (where we reassign partitions) and back to 2.8. If the 
partition.metadata file still existed, we would have the old ID in that, but 
the new controller would see a topic ID missing in the ZNode and assign a new 
one. 

I've opened a PR to prevent this loss of topic ID regardless of the IBP of the 
controller.

> Topic IDs can mismatch on brokers (after interbroker protocol version update)
> -
>
> Key: KAFKA-12835
> URL: https://issues.apache.org/jira/browse/KAFKA-12835
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Ivan Yurchenko
>Assignee: Justine Olshan
>Priority: Major
>
> We had a Kafka cluster running 2.8 version with interbroker protocol set to 
> 2.7. It had a number of topics and everything was fine.
> Then we decided to update the interbroker protocol to 2.8 by the following 
> procedure:
> 1. Run new brokers with the interbroker protocol set to 2.8.
> 2. Move the data from the old brokers to the new ones (normal partition 
> reassignment API).
> 3. Decommission the old brokers.
> At the stage 2 we had the problem: old brokers started failing on 
> {{LeaderAndIsrRequest}} handling with
> {code:java}
> ERROR [Broker id=<...>] Topic Id in memory: <...> does not match the topic Id 
> for partition <...> provided in the request: <...>. (state.change.logger)
> {code}
> for multiple topics. Topics were not recreated.
> We checked {{partition.metadata}} files and IDs there were indeed different 
> from the values in ZooKeeper. It was fixed by deleting the metadata files 
> (and letting them be recreated).
>  
> The logs, unfortunately, didn't show anything that might point to the cause 
> of the issue (or it happened longer ago than we store the logs).
> We tried to reproduce this also, but no success.
> If the community can point out what to check or beware of in future, it will 
> be great. We'll be happy to provide additional information if needed. Thank 
> you! 
> Sorry for the ticket that might be not very actionable. We hope to at least 
> rise awareness of this issue.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman edited a comment on pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-24 Thread GitBox


ableegoldman edited a comment on pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#issuecomment-847413919


   Some unrelated test failures in `RaftClusterTest`, 
`connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector()`,
 and 
   `kafka.api.PlaintextConsumerTest.testPartitionsFor()`
   
   Will merge to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-24 Thread GitBox


ableegoldman commented on pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#issuecomment-847413919


   Some unrelated test failures in `RaftClusterTest`, 
`connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector()`,
 and 
   `kafka.api.PlaintextConsumerTest.testPartitionsFor()`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma closed pull request #10497: KAFKA-12342; Merge RaftClient and MetaLogManager interfaces and remove shim

2021-05-24 Thread GitBox


ijuma closed pull request #10497:
URL: https://github.com/apache/kafka/pull/10497


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests

2021-05-24 Thread GitBox


ableegoldman commented on a change in pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#discussion_r638336725



##
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##
@@ -80,6 +83,35 @@ public String toString() {
 return namedTopology != null ? namedTopology + "_" + topicGroupId + 
"_" + partition : topicGroupId + "_" + partition;
 }
 
+/**
+ * @throws TaskIdFormatException if the taskIdStr is not a valid {@link 
TaskId}
+ */
+public static TaskId parse(final String taskIdStr) {

Review comment:
   I noticed this has been (re)moved since 2.8 so I put it back with 
updates to handle named topologies, plus tests which it did not seem to have




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10755: MINOR: deprecate TaskMetadata constructor, replace TaskId#parse and add tests

2021-05-24 Thread GitBox


ableegoldman opened a new pull request #10755:
URL: https://github.com/apache/kafka/pull/10755


   Quick followup to KIP-740. I also noticed the TaskId#parse method had been 
modified previously, and should be re-added to the public TaskId class. It also 
had no tests, so now it does 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12844) KIP-740 follow up: clean up TaskId

2021-05-24 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12844:
--

 Summary: KIP-740 follow up: clean up TaskId
 Key: KAFKA-12844
 URL: https://issues.apache.org/jira/browse/KAFKA-12844
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 4.0.0


See 
[KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
 – for the TaskId class, we need to remove the following deprecated APIs:
 # The public partition and topicGroupId fields should be "removed", ie made 
private (can also now rename topicGroupId to subtopology to match the getter)
 # The two #readFrom and two #writeTo methods can be removed (they have already 
been converted to internal utility methods we now use instead, so just remove 
them)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata

2021-05-24 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12843:
--

 Summary: KIP-740 follow up: clean up TaskMetadata
 Key: KAFKA-12843
 URL: https://issues.apache.org/jira/browse/KAFKA-12843
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 4.0.0


See 
[KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
 – for the TaskMetadata class, we need to:
 # Deprecate the TaskMetadata#getTaskId method
 # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() 
API that returns a TaskId instead of a String
 # Remove the deprecated constructor



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12838) Kafka Broker - Request threads inefficiently blocking during produce

2021-05-24 Thread Ryan Cabral (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350654#comment-17350654
 ] 

Ryan Cabral commented on KAFKA-12838:
-

Yes, increasing the number of partitions can help reduce contention, but won't 
eliminate it. It is similar to increasing the number of request / io threads 
where it will help to mitigate the issue a bit. The core of the problem though 
is that requests for the same partition are dispatched simultaneously, tying up 
a request / io thread causing requests for other partitions to wait behind 
them, even though they depend on a different lock. That means just two 
producers to the same partition can have some sort of performance impact to an 
entire broker's overall throughput rather than just the partition's throughput. 

> Kafka Broker - Request threads inefficiently blocking during produce
> 
>
> Key: KAFKA-12838
> URL: https://issues.apache.org/jira/browse/KAFKA-12838
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.0, 2.8.0
>Reporter: Ryan Cabral
>Priority: Major
>
> Hello, I have been using Kafka brokers for a bit and have run into a problem 
> with the way a kafka broker handles produce requests. If there are multiple 
> producers to the same topic and partition, any request handler threads 
> handling the produce for that topic and partition become blocked until all 
> requests before it are done. Request handler threads for the entire broker 
> can become exhausted waiting on the same partition lock, blocking requests 
> for other partitions that would not have needed the same lock.
> Once that starts happening, requests start to back up, queued requests can 
> reach its maximum and network threads begin to be paused cascading the 
> problem a bit more. Overall performance ends up being degraded. I'm not so 
> focused on the cascade at the moment as I am the initial contention. 
> Intuitively I would expect locking contention on a single partition to ONLY 
> affect throughput on that partition and not the entire broker.
>  
> The append call within the request handler originates here:
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638]
> Further down the stack the lock during append is created here: 
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165]
> At this point the first request will hold the lock during append and future 
> requests on the same partition will block, waiting for the lock, tying up an 
> io thread (request handler).
> At first glance, it seems like it would make the most sense to (via config?) 
> be able to funnel (produce) requests for the same partition through its own 
> request queue of sorts and dispatch them such that at most one io thread is 
> tied up at a time for a given partition. There are a number of reasons the 
> lock could be held elsewhere too but this should at least help mitigate the 
> issue a bit. I'm assuming this is easier said than done though and likely 
> requires significant refactoring to properly achieve but hoping this is 
> something that could end up on some sort of long term roadmap.
>  
> Snippet from jstack. Almost all request handlers threads (there are 256 of 
> them, up from 25 to mitigate the issue) in the jstack are blocked waiting on 
> the same lock due to the number of producers we have.
>  
> {noformat}
> "data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0 
> tid=0x7fb1c9f13000 nid=0x53f1 runnable [0x7fad35796000]
>    java.lang.Thread.State: RUNNABLE
>   at 
> org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.(KafkaLZ4BlockOutputStream.java:82)
>   at 
> org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.(KafkaLZ4BlockOutputStream.java:125)
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101)
>   at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.(MemoryRecordsBuilder.java:134)
>   at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.(MemoryRecordsBuilder.java:170)
>   at 
> org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508)
>   at 
> kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500)
>   at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455)
>   at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
>   at kafka.log.Log.$anonfun$append$2(Log.scala:1126)
>   - locked <0x0004c9a6fd60> (a java.lang.Object)
>   at kafka.log.Log.append(Log.scala:2387)
>   at kafka.log.Log.appendAsLeader(Log.scala:1050)
>   at 
> 

[GitHub] [kafka] jolshan commented on pull request #10754: KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update)

2021-05-24 Thread GitBox


jolshan commented on pull request #10754:
URL: https://github.com/apache/kafka/pull/10754#issuecomment-847389546


   Hmm looks like `KafkaMetadataLogTest.testTopicId` is failing because we set 
`keepPartitionMetadataFile` to be false. When I ensure that we only assign 
topicId when `keepPartitionMetadataFile` we do not assign topic ID to the log. 
Since we rely on assignment in memory + in the file to be consistent, one 
option is to write a partition.metadata file for the metadata topic. This won't 
be used like the other partition.metadata files, but it might be easier to keep 
all logs consistent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-24 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350650#comment-17350650
 ] 

A. Sophie Blee-Goldman commented on KAFKA-9168:
---

Yep, all the rocksdb work waiting on more recent versions has been unblocked. 
Disclaimer: I don't have much context on this particular task or how, or even 
whether, it's something we can do for current features. It may be that the 
direct buffers are only useful for new features that we were considering, and 
not anything in the current codebase. But I really have no idea – just wanted 
to clarify that figuring this out is I guess the first part of this ticket :) 

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-24 Thread GitBox


junrao commented on a change in pull request #10684:
URL: https://github.com/apache/kafka/pull/10684#discussion_r638164890



##
File path: core/src/main/scala/kafka/log/LogSegments.scala
##
@@ -210,11 +209,23 @@ class LogSegments(topicPartition: TopicPartition) {
* @return the entry associated with the greatest offset, if it exists.
*/
   @threadsafe
-  def lastEntry: Option[Map.Entry[JLong, LogSegment]] = 
Option(segments.lastEntry)
+  def lastEntry: Option[Map.Entry[Long, LogSegment]] = 
Option(segments.lastEntry)
 
   /**
* @return the log segment with the greatest offset, if it exists.
*/
   @threadsafe
   def lastSegment: Option[LogSegment] = lastEntry.map(_.getValue)
+
+  /**
+   * @return an iterable with log segments ordered from lowest base offset to 
highest,
+   * each segment returned  has a base offset strictly greater than 
the provided baseOffset.
+   */
+  def higherSegments(baseOffset: Long): Iterable[LogSegment] = {
+val view =
+  Option(segments.higherKey(baseOffset)).map {
+higherOffset => segments.tailMap(higherOffset, true)
+  }.getOrElse(new ConcurrentSkipListMap[Long, LogSegment]())

Review comment:
   Could we return a constant empty map?

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1217,10 +1213,8 @@ class Log(@volatile private var _dir: File,
   fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, 
minOneMessage)
   if (fetchDataInfo != null) {
 if (includeAbortedTxns)
-  fetchDataInfo = addAbortedTransactions(startOffset, 
segmentEntry, fetchDataInfo)
-  } else segmentEntryOpt = segments.higherEntry(baseOffset)
-
-  done = fetchDataInfo != null || segmentEntryOpt.isEmpty
+  fetchDataInfo = addAbortedTransactions(startOffset, segment, 
fetchDataInfo)
+  } else segmentOpt = segmentsIterator.nextOption()

Review comment:
   The old logic supports skipping forward multiple segments to find the 
right data. The new logic seems to only support skipping forward once. It would 
be useful to preserve the original semantic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #10754: KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update)

2021-05-24 Thread GitBox


jolshan opened a new pull request #10754:
URL: https://github.com/apache/kafka/pull/10754


   Upon upgrading to IBP 2.8, topic ID can end up getting reassigned which can 
cause errors in LeaderAndIsr handling when the partition metadata files from 
the previous ID are still on the broker. 
   
   Topic IDs are stored in the TopicZNode. The behavior of the code before this 
fix is as follows:
   When we have a controller with an older IBP version and we reassign 
partitions, the TopicZNode is overwritten and we lose the topic ID. Upon 
electing a 2.8 IBP controller, we will see the TopicZNode is missing a topic ID 
and will generate a new one. If the broker still has the old partition metadata 
file, we will see an ID mismatch that causes the error.
   
   This PR changes controller logic so that we maintain the topic ID in the 
controller and the ZNode even when IBP < 2.8. This means that in the scenario 
above, reassigning partitions will not result in losing the topic ID and 
reassignment.
   
   Topic IDs may be lost when downgrading the code below version 2.8, but upon 
re-upgrading to code version 2.8, before bumping the IBP, all partition 
metadata files will be deleted to prevent any errors.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12838) Kafka Broker - Request threads inefficiently blocking during produce

2021-05-24 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350606#comment-17350606
 ] 

Ryanne Dolan commented on KAFKA-12838:
--

Would it help to significantly increase the number of partitions you're writing 
to?

> Kafka Broker - Request threads inefficiently blocking during produce
> 
>
> Key: KAFKA-12838
> URL: https://issues.apache.org/jira/browse/KAFKA-12838
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.0, 2.8.0
>Reporter: Ryan Cabral
>Priority: Major
>
> Hello, I have been using Kafka brokers for a bit and have run into a problem 
> with the way a kafka broker handles produce requests. If there are multiple 
> producers to the same topic and partition, any request handler threads 
> handling the produce for that topic and partition become blocked until all 
> requests before it are done. Request handler threads for the entire broker 
> can become exhausted waiting on the same partition lock, blocking requests 
> for other partitions that would not have needed the same lock.
> Once that starts happening, requests start to back up, queued requests can 
> reach its maximum and network threads begin to be paused cascading the 
> problem a bit more. Overall performance ends up being degraded. I'm not so 
> focused on the cascade at the moment as I am the initial contention. 
> Intuitively I would expect locking contention on a single partition to ONLY 
> affect throughput on that partition and not the entire broker.
>  
> The append call within the request handler originates here:
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638]
> Further down the stack the lock during append is created here: 
> [https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165]
> At this point the first request will hold the lock during append and future 
> requests on the same partition will block, waiting for the lock, tying up an 
> io thread (request handler).
> At first glance, it seems like it would make the most sense to (via config?) 
> be able to funnel (produce) requests for the same partition through its own 
> request queue of sorts and dispatch them such that at most one io thread is 
> tied up at a time for a given partition. There are a number of reasons the 
> lock could be held elsewhere too but this should at least help mitigate the 
> issue a bit. I'm assuming this is easier said than done though and likely 
> requires significant refactoring to properly achieve but hoping this is 
> something that could end up on some sort of long term roadmap.
>  
> Snippet from jstack. Almost all request handlers threads (there are 256 of 
> them, up from 25 to mitigate the issue) in the jstack are blocked waiting on 
> the same lock due to the number of producers we have.
>  
> {noformat}
> "data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0 
> tid=0x7fb1c9f13000 nid=0x53f1 runnable [0x7fad35796000]
>    java.lang.Thread.State: RUNNABLE
>   at 
> org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.(KafkaLZ4BlockOutputStream.java:82)
>   at 
> org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.(KafkaLZ4BlockOutputStream.java:125)
>   at 
> org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101)
>   at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.(MemoryRecordsBuilder.java:134)
>   at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.(MemoryRecordsBuilder.java:170)
>   at 
> org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508)
>   at 
> kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500)
>   at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455)
>   at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
>   at kafka.log.Log.$anonfun$append$2(Log.scala:1126)
>   - locked <0x0004c9a6fd60> (a java.lang.Object)
>   at kafka.log.Log.append(Log.scala:2387)
>   at kafka.log.Log.appendAsLeader(Log.scala:1050)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
>   at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown 
> Source)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at 

[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

2021-05-24 Thread GitBox


fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer 
defaultKeySerializer, final Serial
 }
 
 /**
- * @throws StreamsException if both old and new values of data are null, 
or if
- * both values are not null
+ * @throws StreamsException if both old and new values of data are null.
  */
 @Override
 public byte[] serialize(final String topic, final Headers headers, final 
Change data) {
-final byte[] serializedKey;
+final boolean oldValueIsNull = data.oldValue == null;
+final boolean newValueIsNull = data.newValue == null;
 
-// only one of the old / new values would be not null
-if (data.newValue != null) {
-if (data.oldValue != null) {
-throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
-+ " : " + data.newValue + ") in ChangeSerializer, which is 
not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.newValue);
+// both old and new values cannot be null
+if (oldValueIsNull && newValueIsNull) {
+throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
 } else {
-if (data.oldValue == null) {
-throw new StreamsException("Both old and new values are null 
in ChangeSerializer, which is not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.oldValue);
+final byte[] newData = newValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.newValue);
+final byte[] oldData = oldValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.oldValue);
+
+final int newDataLength = newData.length;
+final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + 
oldData.length;
+
+return ByteBuffer
+.allocate(capacity)
+.putInt(newDataLength)
+.put(newData)
+.put(oldData)
+.array();

Review comment:
   With the change to the `ChangedSerializer` and `ChangedDeserializer` 
classes, I don’t think users will be able to just upgrade from a previous 
version of Kafka Streams easily. Any "inflight" messages written by older 
library versions will fail to deserialize correctly after the upgrade. 
   
   Not sure how these types of “breaking” changes are typically handled. 
   
   1. Is it simply a matter of noting this in the relevant upgrade doc i.e. 
users need to do an application-reset? 
   2. Or do we want to write more code to handle upgrade scenarios? 
   3. Or find a more backwards compatible way of writing this serde?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

2021-05-24 Thread GitBox


fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer 
defaultKeySerializer, final Serial
 }
 
 /**
- * @throws StreamsException if both old and new values of data are null, 
or if
- * both values are not null
+ * @throws StreamsException if both old and new values of data are null.
  */
 @Override
 public byte[] serialize(final String topic, final Headers headers, final 
Change data) {
-final byte[] serializedKey;
+final boolean oldValueIsNull = data.oldValue == null;
+final boolean newValueIsNull = data.newValue == null;
 
-// only one of the old / new values would be not null
-if (data.newValue != null) {
-if (data.oldValue != null) {
-throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
-+ " : " + data.newValue + ") in ChangeSerializer, which is 
not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.newValue);
+// both old and new values cannot be null
+if (oldValueIsNull && newValueIsNull) {
+throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
 } else {
-if (data.oldValue == null) {
-throw new StreamsException("Both old and new values are null 
in ChangeSerializer, which is not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.oldValue);
+final byte[] newData = newValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.newValue);
+final byte[] oldData = oldValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.oldValue);
+
+final int newDataLength = newData.length;
+final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + 
oldData.length;
+
+return ByteBuffer
+.allocate(capacity)
+.putInt(newDataLength)
+.put(newData)
+.put(oldData)
+.array();

Review comment:
   With the change to the `ChangedSerializer` and `ChangedDeserializer` 
classes, I don’t think users will be able to just upgrade from a previous 
version of Kafka Streams easily. Any "inflight" messages written by older 
library versions will fail to deserialize correctly after the upgrade. 
   
   Not sure how these types of “breaking” changes are typically handled. 
   
   1. Is it simply a matter of noting this in the relevant upgrade doc? 
   2. Or do we want to write more code to handle upgrade scenarios? 
   3. Or find a more backwards compatible way of writing this serde?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-05-24 Thread GitBox


cmccabe opened a new pull request #10753:
URL: https://github.com/apache/kafka/pull/10753


   Support the KIP-455 reassignment API when in KRaft mode. Reassignments
   which merely rearrange partitions complete immediately. Those that only
   remove a partition complete immediately if the ISR would be non-empty
   after the specified removals. Reassignments that add one or more
   partitions follow the KIP-455 pattern of adding all the adding replicas
   to the replica set, and then waiting for the ISR to include all the new
   partitions before completing. Changes to the partition sets are
   accomplished via PartitionChangeRecord.
   
   Add support for the ReassigningPartitions metric, which tracks the
   number of partitions which are currently reassigning. This metric is
   only exposed when running in standalone mode, to avoid conflicting with
   the broker metric.
   
   In TimelineInteger and TimelineLong, replace increment with
   incrementAndGet, and add an addAndGet function. Similar for decrement /
   subtraction. This makes updating metrics more convenient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

2021-05-24 Thread GitBox


mumrah opened a new pull request #10752:
URL: https://github.com/apache/kafka/pull/10752


   This is part 2 of 
[KIP-730](https://cwiki.apache.org/confluence/display/KAFKA/KIP-730%3A+Producer+ID+generation+in+KRaft+mode),
 part 1 was in #10504.
   
   This PR adds support on the KRaft controller for handling 
AllocateProducerIDs requests and managing the state of the latest producer ID 
block in the controller and committing this state to the metadata log.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-24 Thread GitBox


junrao merged pull request #10742:
URL: https://github.com/apache/kafka/pull/10742


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2021-05-24 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350551#comment-17350551
 ] 

Jun Rao commented on KAFKA-8522:


latest PR link

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dejan2609 commented on pull request #10698: KAFKA-12770: introduce `checkstyleVersion` build option (for overriding CheckStyle project-defined dependency version)

2021-05-24 Thread GitBox


dejan2609 commented on pull request #10698:
URL: https://github.com/apache/kafka/pull/10698#issuecomment-847201180


   CheckStyle team (@romani) needs this in order to add Kafka project into 
their regression suit here: 
   
https://github.com/checkstyle/contribution/blob/master/checkstyle-tester/projects-to-test-on.properties
   
   @showuon was so kind to provide his assistance, but we now need approval by 
someone who has a write access (@mumrah, @guozhangwang, @hachikuji, @ijuma, 
@junrao, @cmccabe or someone else).
   
   Note: changes are small and simple (and risk doesn't exist).
   
   

   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10698: KAFKA-12770: introduce `checkstyleVersion` build option (for overriding CheckStyle project-defined dependency version)

2021-05-24 Thread GitBox


dejan2609 edited a comment on pull request #10698:
URL: https://github.com/apache/kafka/pull/10698#issuecomment-847174293


   If I may ask you @showuon: do we need to ping someone else for another 
review (or for merge into trunk) ? 
   
   _Edit (just to answer to my self): there are two types of approvals and 
hence this PR does need an additional review(s)... in any case: thanx @showuon 
!_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10698: KAFKA-12770: introduce `checkstyleVersion` build option (for overriding CheckStyle project-defined dependency version)

2021-05-24 Thread GitBox


dejan2609 commented on pull request #10698:
URL: https://github.com/apache/kafka/pull/10698#issuecomment-847174293


   If I may ask you @showuon: do we need to ping someone else for another 
review (or for merge into trunk) ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10751: MINOR: update java doc for ConsumerCoordinator

2021-05-24 Thread GitBox


dengziming commented on pull request #10751:
URL: https://github.com/apache/kafka/pull/10751#issuecomment-847135527


   iif means if and only if
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10744: KAFKA-8410: KTableProcessor migration groundwork

2021-05-24 Thread GitBox


vvcephei commented on pull request #10744:
URL: https://github.com/apache/kafka/pull/10744#issuecomment-847118811


   Filed ticket for Connect test: 
https://issues.apache.org/jira/browse/KAFKA-12842
   Commented on ticket for Raft test: 
https://issues.apache.org/jira/browse/KAFKA-12629


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12629) Failing Test: RaftClusterTest

2021-05-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12629:
-
Summary: Failing Test: RaftClusterTest  (was: Flaky Test RaftClusterTest)

> Failing Test: RaftClusterTest
> -
>
> Key: KAFKA-12629
> URL: https://issues.apache.org/jira/browse/KAFKA-12629
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> {quote} {{java.util.concurrent.ExecutionException: 
> java.lang.ClassNotFoundException: 
> org.apache.kafka.controller.NoOpSnapshotWriterBuilder
>   at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12629) Flaky Test RaftClusterTest

2021-05-24 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350483#comment-17350483
 ] 

John Roesler edited comment on KAFKA-12629 at 5/24/21, 3:22 PM:


Failed also on: 
[https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643450959]

When this build fails in a PR build, it means that it failed twice in a row. 
I'm inclined to merge [~showuon] 's PR. Perpetually failing tests provide 
negative value to the codebase. If we can't bother to fix the test, then we 
obviously don't care about the logic under test.

Rather than do anything rash, I've upgraded this ticket and the proposed cause 
(KAFKA-12677) to blockers for 3.0. If a lot more time passes with no action 
here, I think we should go ahead and ignore the tests. The blocker tickets 
should be sufficient to get it looked at for 3.0.
{noformat}
 java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:197)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:46)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at 

[jira] [Updated] (KAFKA-12677) The raftCluster always send to the wrong active controller and never update

2021-05-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12677:
-
Fix Version/s: 3.0.0

> The raftCluster always send to the wrong active controller and never update
> ---
>
> Key: KAFKA-12677
> URL: https://issues.apache.org/jira/browse/KAFKA-12677
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Luke Chen
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.0.0
>
>
> We introduce KIP-500 to introduce a Self-Managed Metadata Quorum. We should 
> always have 1 active controller, and all the RPC will send to the active 
> controller. But there's chances that the active controller already changed, 
> but the RPC still send to the old one.
> In the attachment log, we can see:
> {code:java}
> [Controller 3002] Becoming active at controller epoch 1. 
> ...
> [Controller 3000] Becoming active at controller epoch 2. 
> {code}
> So, the latest active controller should be 3000. But the create topic RPC are 
> all sending to controller 3002:
> {code:java}
> "errorMessage":"The active controller appears to be node 3000"
> {code}
> This bug causes the RaftClusterTests flaky.
>  
> Debug log while running testCreateClusterAndCreateListDeleteTopic test: 
> https://drive.google.com/file/d/1WVUgy1Erjx8mHyofiP9MVvQGb0LcDYt3/view?usp=sharing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12677) The raftCluster always send to the wrong active controller and never update

2021-05-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12677:
-
Priority: Blocker  (was: Major)

> The raftCluster always send to the wrong active controller and never update
> ---
>
> Key: KAFKA-12677
> URL: https://issues.apache.org/jira/browse/KAFKA-12677
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Luke Chen
>Assignee: Colin McCabe
>Priority: Blocker
>
> We introduce KIP-500 to introduce a Self-Managed Metadata Quorum. We should 
> always have 1 active controller, and all the RPC will send to the active 
> controller. But there's chances that the active controller already changed, 
> but the RPC still send to the old one.
> In the attachment log, we can see:
> {code:java}
> [Controller 3002] Becoming active at controller epoch 1. 
> ...
> [Controller 3000] Becoming active at controller epoch 2. 
> {code}
> So, the latest active controller should be 3000. But the create topic RPC are 
> all sending to controller 3002:
> {code:java}
> "errorMessage":"The active controller appears to be node 3000"
> {code}
> This bug causes the RaftClusterTests flaky.
>  
> Debug log while running testCreateClusterAndCreateListDeleteTopic test: 
> https://drive.google.com/file/d/1WVUgy1Erjx8mHyofiP9MVvQGb0LcDYt3/view?usp=sharing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12629) Flaky Test RaftClusterTest

2021-05-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12629:
-
Priority: Blocker  (was: Critical)

> Flaky Test RaftClusterTest
> --
>
> Key: KAFKA-12629
> URL: https://issues.apache.org/jira/browse/KAFKA-12629
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
>
> {quote} {{java.util.concurrent.ExecutionException: 
> java.lang.ClassNotFoundException: 
> org.apache.kafka.controller.NoOpSnapshotWriterBuilder
>   at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12629) Flaky Test RaftClusterTest

2021-05-24 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-12629:
-
Fix Version/s: 3.0.0

> Flaky Test RaftClusterTest
> --
>
> Key: KAFKA-12629
> URL: https://issues.apache.org/jira/browse/KAFKA-12629
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> {quote} {{java.util.concurrent.ExecutionException: 
> java.lang.ClassNotFoundException: 
> org.apache.kafka.controller.NoOpSnapshotWriterBuilder
>   at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12629) Flaky Test RaftClusterTest

2021-05-24 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350483#comment-17350483
 ] 

John Roesler commented on KAFKA-12629:
--

Failed also on: 
[https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643450959]

When this build fails in a PR build, it means that it failed twice in a row. 
I'm inclined to merge [~showuon] 's PR. Perpetually failing tests provide 
negative value to the codebase. If we can't bother to fix the test, then we 
obviously don't care about the logic under test.
{noformat}
 java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:197)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
org.junit.jupiter.engine.extension.TimeoutInvocation.proceed(TimeoutInvocation.java:46)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)

[jira] [Created] (KAFKA-12842) Failing test: org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic

2021-05-24 Thread John Roesler (Jira)
John Roesler created KAFKA-12842:


 Summary: Failing test: 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic
 Key: KAFKA-12842
 URL: https://issues.apache.org/jira/browse/KAFKA-12842
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: John Roesler
 Fix For: 3.0.0


This test failed during a PR build, which means that it failed twice in a row, 
due to the test-retry logic in PR builds.

 

[https://github.com/apache/kafka/pull/10744/checks?check_run_id=2643417209]

 
{noformat}
java.lang.NullPointerException
at 
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
at org.reflections.Store.getAllIncluding(Store.java:82)
at org.reflections.Store.getAll(Store.java:93)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:352)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
at 
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
at 
org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93)
at 
org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:174)
at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:260)
at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:141)
at 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testSourceTaskNotBlockedOnShutdownWithNonExistentTopic(ConnectWorkerIntegrationTest.java:303)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at 

[GitHub] [kafka] sasukerui opened a new pull request #10751: MINOR: update java doc for ConsumerCoordinator

2021-05-24 Thread GitBox


sasukerui opened a new pull request #10751:
URL: https://github.com/apache/kafka/pull/10751


   fix typo


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-24 Thread GitBox


showuon commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-847042383


   @vahidhashemian , thanks for your comments. I've updated. Please take a look 
again. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-24 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r637943535



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -80,9 +80,7 @@ public MemberData(List partitions, 
Optional generation)
 log.debug("Detected that all not consumers were subscribed to same 
set of topics, falling back to the "
   + "general case assignment algorithm");

Review comment:
   Updated. Thanks.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+balance(currentAssignment, prevAssignment, sortedAllPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionsCount);
+
+if (log.isDebugEnabled()) {
+log.debug("final assignment: {}", currentAssignment);
+}
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+ * This is used in generalAssign method
+ *
+ * We loop the sortedPartition, and compare the ith element in 
sortedAssignedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+ *
+ * @param sortedAllPartitions:  sorted all partitions
+ * @param sortedAssignedPartitions: sorted partitions, all are 
included in the sortedPartitions
+ * @param topic2AllPotentialConsumers:  topics mapped to all consumers 
that subscribed to it
+ * @return  the partitions don't assign to any 
current consumers

Review comment:
   Updated. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-24 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r637941574



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##
@@ -598,6 +555,43 @@ public void 
testLargeAssignmentAndGroupWithUniformSubscription() {
 assignor.assign(partitionsPerTopic, subscriptions);
 }
 
+@Timeout(40)
+@Test
+public void testLargeAssignmentAndGroupWithNonEqualSubscription() {
+// 1 million partitions!
+int topicCount = 500;
+int partitionCount = 2_000;
+int consumerCount = 2_000;
+
+List topics = new ArrayList<>();
+Map partitionsPerTopic = new HashMap<>();
+for (int i = 0; i < topicCount; i++) {
+String topicName = getTopicName(i, topicCount);
+topics.add(topicName);
+partitionsPerTopic.put(topicName, partitionCount);
+}
+for (int i = 0; i < consumerCount; i++) {
+if (i == consumerCount - 1) {
+subscriptions.put(getConsumerName(i, consumerCount), new 
Subscription(topics.subList(0, 1)));
+} else {
+subscriptions.put(getConsumerName(i, consumerCount), new 
Subscription(topics));
+}
+}
+
+Map> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
+

Review comment:
   It can, but this test: 
`testLargeAssignmentAndGroupWithNonEqualSubscription` is mainly to test 
**performance**, not functionality. We should cover the function testing in 
other tests. It's the same purpose for the above one: 
`testLargeAssignmentAndGroupWithUniformSubscription`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-24 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r637939950



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -384,37 +326,39 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  *
  * @param partitionsPerTopic The number of partitions for each 
subscribed topic.
  * @param subscriptions  Map from the member id to their 
respective topic subscription
+ * @param currentAssignment  Each consumer's previously owned and 
still-subscribed partitions
  *
  * @return   Map from each member to the list of 
partitions assigned to them.
  */
 private Map> generalAssign(Map partitionsPerTopic,
-Map subscriptions) {
-Map> currentAssignment = new HashMap<>();
+Map subscriptions,
+Map> currentAssignment) {
+if (log.isDebugEnabled()) {
+log.debug("performing general assign. partitionsPerTopic: {}, 
subscriptions: {}, currentAssignment: {}",
+partitionsPerTopic, subscriptions, currentAssignment);
+}
+
 Map prevAssignment = new 
HashMap<>();
 partitionMovements = new PartitionMovements();
 
-prepopulateCurrentAssignments(subscriptions, currentAssignment, 
prevAssignment);
+prepopulateCurrentAssignments(subscriptions, prevAssignment);
 
-// a mapping of all topic partitions to all consumers that can be 
assigned to them
-final Map> 
partition2AllPotentialConsumers = new HashMap<>();
-// a mapping of all consumers to all potential topic partitions that 
can be assigned to them
-final Map> 
consumer2AllPotentialPartitions = new HashMap<>();
+// a mapping of all topics to all consumers that can be assigned to 
them
+final Map> topic2AllPotentialConsumers = new 
HashMap<>(partitionsPerTopic.keySet().size());
+// a mapping of all consumers to all potential topics that can be 
assigned to them
+final Map> consumer2AllPotentialTopics = new 
HashMap<>(subscriptions.keySet().size());
 
-// initialize partition2AllPotentialConsumers and 
consumer2AllPotentialPartitions in the following two for loops
-for (Entry entry: partitionsPerTopic.entrySet()) {
-for (int i = 0; i < entry.getValue(); ++i)
-partition2AllPotentialConsumers.put(new 
TopicPartition(entry.getKey(), i), new ArrayList<>());
-}
+// initialize topic2AllPotentialConsumers and 
consumer2AllPotentialTopics
+partitionsPerTopic.keySet().stream().forEach(
+topicName -> topic2AllPotentialConsumers.put(topicName, new 
ArrayList<>()));
 
 for (Entry entry: subscriptions.entrySet()) {
 String consumerId = entry.getKey();
-consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>());
+List subscribedTopics = new 
ArrayList<>(entry.getValue().topics().size());
+consumer2AllPotentialTopics.put(consumerId, subscribedTopics);
 entry.getValue().topics().stream().filter(topic -> 
partitionsPerTopic.get(topic) != null).forEach(topic -> {
-for (int i = 0; i < partitionsPerTopic.get(topic); ++i) {
-TopicPartition topicPartition = new TopicPartition(topic, 
i);
-
consumer2AllPotentialPartitions.get(consumerId).add(topicPartition);
-
partition2AllPotentialConsumers.get(topicPartition).add(consumerId);
-}
+subscribedTopics.add(topic);

Review comment:
   No, it just create a List with the capacity: `topics().size()`. We 
cannot just create a List with all topics directly, because we need to filter 
out topics not in `partitionsPerTopic`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-24 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r637936879



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -444,23 +392,32 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 // otherwise (the consumer still exists)
 for (Iterator partitionIter = 
entry.getValue().iterator(); partitionIter.hasNext();) {
 TopicPartition partition = partitionIter.next();
-if 
(!partition2AllPotentialConsumers.containsKey(partition)) {
-// if this topic partition of this consumer no longer 
exists remove it from currentAssignment of the consumer
+if 
(!topic2AllPotentialConsumers.containsKey(partition.topic())) {
+// if this topic partition of this consumer no longer 
exists, remove it from currentAssignment of the consumer
 partitionIter.remove();
 currentPartitionConsumer.remove(partition);
-} else if 
(!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {
-// if this partition cannot remain assigned to its 
current consumer because the consumer
-// is no longer subscribed to its topic remove it from 
currentAssignment of the consumer
+} else if 
(!consumerSubscription.topics().contains(partition.topic())) {
+// because the consumer is no longer subscribed to its 
topic, remove it from currentAssignment of the consumer
 partitionIter.remove();
 revocationRequired = true;
-} else
+} else {
 // otherwise, remove the topic partition from those 
that need to be assigned only if
 // its current consumer is still subscribed to its 
topic (because it is already assigned
 // and we would want to preserve that assignment as 
much as possible)
-unassignedPartitions.remove(partition);
+assignedPartitions.add(partition);
+}
 }
 }
 }
+
+// all partitions that needed to be assigned
+List unassignedPartitions = 
getUnassignedPartitions(sortedAllPartitions, assignedPartitions, 
topic2AllPotentialConsumers);
+assignedPartitions = null;

Review comment:
   Yes, it just tells the GC that this memory can be freed, to avoid OOM. I 
know in this step, we should already allocated all memories we need, but it's 
just in case. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12333) KafkaMetadataLog and MockLock should validate that appended epochs are monotonically

2021-05-24 Thread loboxu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350413#comment-17350413
 ] 

loboxu commented on KAFKA-12333:


[~jsancio] The problem seems to have been fixed, right?

> KafkaMetadataLog and MockLock should validate that appended epochs are 
> monotonically
> 
>
> Key: KAFKA-12333
> URL: https://issues.apache.org/jira/browse/KAFKA-12333
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: loboxu
>Priority: Major
>
> Both the MockLog and KafkaMetadataLog should only allow appendAsLeader and 
> appendAsFollower with monotonically increasing epochs. In other words the 
> following test in KafkaMetadataLogTest should fail:
> {code:java}
>   @Test
>   def testOutOfOrderEpoch(): Unit = {
> val topicPartition = new TopicPartition("cluster-metadata", 0)
> val log = buildMetadataLog(tempDir, mockTime, topicPartition)val 
> recordFoo = new SimpleRecord("foo".getBytes())
> val currentEpoch = 3
> val initialOffset = log.endOffset().offsetlog.appendAsLeader(
>   MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 
> currentEpoch, recordFoo),
>   currentEpoch
> )// Out order epoch should throw an exception
> log.appendAsLeader(
>   MemoryRecords.withRecords(
> initialOffset + 1, CompressionType.NONE, currentEpoch - 1, recordFoo
>   ),
>   currentEpoch - 1
> )log.appendAsFollower(
>   MemoryRecords.withRecords(
> initialOffset + 2, CompressionType.NONE, currentEpoch - 2, recordFoo
>   )
> )
>   } {code}
> The same for MockLogTest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

2021-05-24 Thread GitBox


fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer 
defaultKeySerializer, final Serial
 }
 
 /**
- * @throws StreamsException if both old and new values of data are null, 
or if
- * both values are not null
+ * @throws StreamsException if both old and new values of data are null.
  */
 @Override
 public byte[] serialize(final String topic, final Headers headers, final 
Change data) {
-final byte[] serializedKey;
+final boolean oldValueIsNull = data.oldValue == null;
+final boolean newValueIsNull = data.newValue == null;
 
-// only one of the old / new values would be not null
-if (data.newValue != null) {
-if (data.oldValue != null) {
-throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
-+ " : " + data.newValue + ") in ChangeSerializer, which is 
not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.newValue);
+// both old and new values cannot be null
+if (oldValueIsNull && newValueIsNull) {
+throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
 } else {
-if (data.oldValue == null) {
-throw new StreamsException("Both old and new values are null 
in ChangeSerializer, which is not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.oldValue);
+final byte[] newData = newValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.newValue);
+final byte[] oldData = oldValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.oldValue);
+
+final int newDataLength = newData.length;
+final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + 
oldData.length;
+
+return ByteBuffer
+.allocate(capacity)
+.putInt(newDataLength)
+.put(newData)
+.put(oldData)
+.array();

Review comment:
   With the change to the `ChangedSerializer` and `ChangedDeserializer` 
classes, I don’t think users will be able to just upgrade from a previous 
version of Kafka Streams easily. Any "inflight" messages from older library 
versions will fail to deserialize correctly. 
   
   Not sure how these types of “breaking” changes are typically handled. 
   
   1. Is it simply a matter of noting this in the relevant upgrade doc? 
   2. Or do we want to write more code to handle upgrade scenarios? 
   3. Or find a more backwards compatible way of writing this serde?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

2021-05-24 Thread GitBox


fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer 
defaultKeySerializer, final Serial
 }
 
 /**
- * @throws StreamsException if both old and new values of data are null, 
or if
- * both values are not null
+ * @throws StreamsException if both old and new values of data are null.
  */
 @Override
 public byte[] serialize(final String topic, final Headers headers, final 
Change data) {
-final byte[] serializedKey;
+final boolean oldValueIsNull = data.oldValue == null;
+final boolean newValueIsNull = data.newValue == null;
 
-// only one of the old / new values would be not null
-if (data.newValue != null) {
-if (data.oldValue != null) {
-throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
-+ " : " + data.newValue + ") in ChangeSerializer, which is 
not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.newValue);
+// both old and new values cannot be null
+if (oldValueIsNull && newValueIsNull) {
+throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
 } else {
-if (data.oldValue == null) {
-throw new StreamsException("Both old and new values are null 
in ChangeSerializer, which is not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.oldValue);
+final byte[] newData = newValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.newValue);
+final byte[] oldData = oldValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.oldValue);
+
+final int newDataLength = newData.length;
+final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + 
oldData.length;
+
+return ByteBuffer
+.allocate(capacity)
+.putInt(newDataLength)
+.put(newData)
+.put(oldData)
+.array();

Review comment:
   With the change to the `ChangedSerializer` and `ChangedDeserializer` 
classes, I don’t think users will be able to just upgrade from a previous 
version of Kafka Streams easily. Any "inflight" messages from older library 
versions will fail to deserialize correctly. Not sure how these types of 
“breaking” changes are typically handled. Is it simply a matter of noting this 
in the relevant upgrade doc? Or do we want to write more code to handle upgrade 
scenarios? Or find a more backwards compatible way of writing this serde?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

2021-05-24 Thread GitBox


fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657657



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
##
@@ -90,14 +90,19 @@ public void process(final K key, final Change change) {
 
 // if the selected repartition key or value is null, skip
 // forward oldPair first, to be consistent with reduce and 
aggregate
-if (oldPair != null && oldPair.key != null && oldPair.value != 
null) {
-context().forward(oldPair.key, new Change<>(null, 
oldPair.value));
+final boolean oldPairNotNull = oldPair != null && oldPair.key != 
null && oldPair.value != null;
+final boolean newPairNotNull = newPair != null && newPair.key != 
null && newPair.value != null;
+if (oldPairNotNull && newPairNotNull && oldPair.key == 
newPair.key) {

Review comment:
   As noted by Matthias on the mailing list thread, this fix depends on a 
correct implementation of `.equals()` method for the key type. Would we need to 
document this assumption somewhere for users?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ayoukhananov commented on pull request #10728: NPE from the provided metadata in client callback in case of ApiException.

2021-05-24 Thread GitBox


ayoukhananov commented on pull request #10728:
URL: https://github.com/apache/kafka/pull/10728#issuecomment-846970648


   @showuon  Thanks for your feedback.
   
   1. NPE happened in our Prod env . 
   I miss the part of Jira ticket and added now this 
[Ticket](https://issues.apache.org/jira/browse/KAFKA-12841)
   2. Will Add Kafka
   3. Will try to add tests soon


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12841) NPE from the provided metadata in client callback in case of ApiException

2021-05-24 Thread Avi Youkhananov (Jira)
Avi Youkhananov created KAFKA-12841:
---

 Summary: NPE from the provided metadata in client callback in case 
of ApiException
 Key: KAFKA-12841
 URL: https://issues.apache.org/jira/browse/KAFKA-12841
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.6.0
 Environment: Prod
Reporter: Avi Youkhananov
 Attachments: NPE.production

1.
org.apache.kafka.clients.producer.Callback interface has method 
onCompletion(...)
Which says as part of the documentation :

*The metadata for the record that was sent (i.e. the partition and offset). *An 
empty metadata with -1 value for all fields* except for topicPartition will be 
returned if an error occurred.


We got an NPE from doSend(...) method in 
org.apache.kafka.clients.producer.KafkaProducer 
Which can occur in case ApiException was thrown ...
In case of ApiException it uses the regular callback instead of 
InterceptorCallback which also may cover the NPE.

2. More over RecordMetadata has method partition() which return int but can 
also throw NPE because TopicPartition might be null.

Stack trace attached.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] fqaiser94 commented on a change in pull request #10747: KAFKA-12446: Define KGroupedTable#aggregate subtractor + adder order of execution

2021-05-24 Thread GitBox


fqaiser94 commented on a change in pull request #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r637657442



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
##
@@ -45,34 +45,30 @@ public void setIfUnset(final Serializer 
defaultKeySerializer, final Serial
 }
 
 /**
- * @throws StreamsException if both old and new values of data are null, 
or if
- * both values are not null
+ * @throws StreamsException if both old and new values of data are null.
  */
 @Override
 public byte[] serialize(final String topic, final Headers headers, final 
Change data) {
-final byte[] serializedKey;
+final boolean oldValueIsNull = data.oldValue == null;
+final boolean newValueIsNull = data.newValue == null;
 
-// only one of the old / new values would be not null
-if (data.newValue != null) {
-if (data.oldValue != null) {
-throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
-+ " : " + data.newValue + ") in ChangeSerializer, which is 
not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.newValue);
+// both old and new values cannot be null
+if (oldValueIsNull && newValueIsNull) {
+throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
 } else {
-if (data.oldValue == null) {
-throw new StreamsException("Both old and new values are null 
in ChangeSerializer, which is not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.oldValue);
+final byte[] newData = newValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.newValue);
+final byte[] oldData = oldValueIsNull ? new byte[0] : 
inner.serialize(topic, headers, data.oldValue);
+
+final int newDataLength = newData.length;
+final int capacity = NEW_DATA_LENGTH_BYTES_SIZE + newDataLength + 
oldData.length;
+
+return ByteBuffer
+.allocate(capacity)
+.putInt(newDataLength)
+.put(newData)
+.put(oldData)
+.array();

Review comment:
   With the change to the `ChangedSerializer` and `ChangedDeserializer` 
classes, I don’t think users will be able to just upgrade from a previous 
version of Kafka Streams easily. Any messages that were "inflight" prior to the 
upgrade will fail to deserialize correctly. Not sure how these types of 
“breaking” changes are typically handled. Is it simply a matter of noting this 
in the relevant upgrade doc? Or do we want to write more code to handle upgrade 
scenarios?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT opened a new pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-24 Thread GitBox


DuongPTIT opened a new pull request #10750:
URL: https://github.com/apache/kafka/pull/10750


   Getting NegativeArraySizeException when using Kafka Connect to send data to 
Kafka on Kafka version 2.5
   
   PTAL @huxihx @kkonstantine . Many thanks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12840) Removing `compact` cleaning on a topic should abort on-going compactions

2021-05-24 Thread David Jacot (Jira)
David Jacot created KAFKA-12840:
---

 Summary: Removing `compact` cleaning on a topic should abort 
on-going compactions
 Key: KAFKA-12840
 URL: https://issues.apache.org/jira/browse/KAFKA-12840
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


When `compact` is removed from the `cleanup.policy` of a topic, the compactions 
of that topic should be aborted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12835) Topic IDs can mismatch on brokers (after interbroker protocol version update)

2021-05-24 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350330#comment-17350330
 ] 

Ivan Yurchenko commented on KAFKA-12835:


bq. I was curious about your upgrade process. Is there a reason you moved from 
old brokers to new brokers rather than doing a rolling restart of the same 
brokers?

We try to keep machines immutable, nothing specific in Kafka itself.

bq. I believe I found the cause to this issue and will be working on a fix.

Great to hear this! Could you please the idea at the higher level?

> Topic IDs can mismatch on brokers (after interbroker protocol version update)
> -
>
> Key: KAFKA-12835
> URL: https://issues.apache.org/jira/browse/KAFKA-12835
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Ivan Yurchenko
>Assignee: Justine Olshan
>Priority: Major
>
> We had a Kafka cluster running 2.8 version with interbroker protocol set to 
> 2.7. It had a number of topics and everything was fine.
> Then we decided to update the interbroker protocol to 2.8 by the following 
> procedure:
> 1. Run new brokers with the interbroker protocol set to 2.8.
> 2. Move the data from the old brokers to the new ones (normal partition 
> reassignment API).
> 3. Decommission the old brokers.
> At the stage 2 we had the problem: old brokers started failing on 
> {{LeaderAndIsrRequest}} handling with
> {code:java}
> ERROR [Broker id=<...>] Topic Id in memory: <...> does not match the topic Id 
> for partition <...> provided in the request: <...>. (state.change.logger)
> {code}
> for multiple topics. Topics were not recreated.
> We checked {{partition.metadata}} files and IDs there were indeed different 
> from the values in ZooKeeper. It was fixed by deleting the metadata files 
> (and letting them be recreated).
>  
> The logs, unfortunately, didn't show anything that might point to the cause 
> of the issue (or it happened longer ago than we store the logs).
> We tried to reproduce this also, but no success.
> If the community can point out what to check or beware of in future, it will 
> be great. We'll be happy to provide additional information if needed. Thank 
> you! 
> Sorry for the ticket that might be not very actionable. We hope to at least 
> rise awareness of this issue.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-8120) Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-24 Thread Pham Huy Hoang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pham Huy Hoang updated KAFKA-8120:
--
Comment: was deleted

(was: Hi [~wj1918], I tried to test as your above steps. However, I got an 
unfathomable consumer error. When I added "batch.size=1" in 
config/connect-file-source.properties, my console-consumer received only 
1985 / 2000 records (with records gen from your mongodump script), as 
well as it received only 96/100 records (with your test.txt file). Sometimes, 
it received only 1 records for each restarting kafka-connect. Please tell me 
why.

Many thanks. )

> Getting NegativeArraySizeException when using Kafka Connect to send data to 
> Kafka
> -
>
> Key: KAFKA-8120
> URL: https://issues.apache.org/jira/browse/KAFKA-8120
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1, 2.5.0
> Environment: Ubuntu 16.04 LTS
>Reporter: Prashant Shahi
>Assignee: Jun Wang
>Priority: Major
> Attachments: gen-mongodump.sh.txt, test.txt
>
>
>  
> I have a large MongoDump JSON which I tried pushing to Kafka using Kafka 
> Connect.
> I am getting the following Exception after around 16k messages been pushed. 
> After the exception, the program doesn't get killed or exit, but now no more 
> messages are pushed.
> {code:java}
> [2019-03-15 08:48:13,812] ERROR WorkerSourceTask{id=od-test18-0} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:177) 
> java.lang.NegativeArraySizeException at 
> org.apache.kafka.connect.file.FileStreamSourceTask.poll(FileStreamSourceTask.java:141)
>  at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
>  at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
>  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) [2019-03-15 08:48:13,814] ERROR 
> WorkerSourceTask{id=od-test18-0} Task is being killed and will not recover 
> until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:178){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12461) Extend LogManager to cover the metadata topic

2021-05-24 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-12461:
--

Assignee: loboxu

> Extend LogManager to cover the metadata topic
> -
>
> Key: KAFKA-12461
> URL: https://issues.apache.org/jira/browse/KAFKA-12461
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: loboxu
>Priority: Major
>
> The `@metadata` topic is not managed by `LogManager` since it uses a new 
> snapshot-based retention policy. This means that it is not covered by the 
> recovery and high watermark checkpoints. It would be useful to fix this. We 
> can either extend `LogManager` so that it is aware of the snapshotting 
> semantics implemented by the `@metadata` topic, or we can create something 
> like a `RaftLogManager`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8120) Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-24 Thread Pham Huy Hoang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350289#comment-17350289
 ] 

Pham Huy Hoang commented on KAFKA-8120:
---

Hi [~wj1918], I tried to test as your above steps. However, I got an 
unfathomable consumer error. When I added "batch.size=1" in 
config/connect-file-source.properties, my console-consumer received only 
1985 / 2000 records (with records gen from your mongodump script), as 
well as it received only 96/100 records (with your test.txt file). Sometimes, 
it received only 1 records for each restarting kafka-connect. Please tell me 
why.

Many thanks. 

> Getting NegativeArraySizeException when using Kafka Connect to send data to 
> Kafka
> -
>
> Key: KAFKA-8120
> URL: https://issues.apache.org/jira/browse/KAFKA-8120
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1, 2.5.0
> Environment: Ubuntu 16.04 LTS
>Reporter: Prashant Shahi
>Assignee: Jun Wang
>Priority: Major
> Attachments: gen-mongodump.sh.txt, test.txt
>
>
>  
> I have a large MongoDump JSON which I tried pushing to Kafka using Kafka 
> Connect.
> I am getting the following Exception after around 16k messages been pushed. 
> After the exception, the program doesn't get killed or exit, but now no more 
> messages are pushed.
> {code:java}
> [2019-03-15 08:48:13,812] ERROR WorkerSourceTask{id=od-test18-0} Task threw 
> an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:177) 
> java.lang.NegativeArraySizeException at 
> org.apache.kafka.connect.file.FileStreamSourceTask.poll(FileStreamSourceTask.java:141)
>  at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
>  at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
>  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) [2019-03-15 08:48:13,814] ERROR 
> WorkerSourceTask{id=od-test18-0} Task is being killed and will not recover 
> until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:178){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12805) Aborted send could have a different exception than DisconnectException

2021-05-24 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350279#comment-17350279
 ] 

Luke Chen commented on KAFKA-12805:
---

[~nicolas.guyomar], I tried to set the  
{{[request.timeout.ms|http://request.timeout.ms/]}} to a small value, and 
reproduce your issue, but I only got the exception:
{code:java}
 org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
Could you tell me I could reproduce your error? Thank you.

> Aborted send could have a different exception than DisconnectException
> --
>
> Key: KAFKA-12805
> URL: https://issues.apache.org/jira/browse/KAFKA-12805
> Project: Kafka
>  Issue Type: Wish
>  Components: network
>Reporter: Nicolas Guyomar
>Assignee: Luke Chen
>Priority: Minor
>
> Right now we are treating timeout in the network client as a disconnection 
> exception, which "hides" legit timeout where increasing 
> {{[request.timeout.ms|http://request.timeout.ms/]}} could be considered OK 
>  when there is no "real" network disconnection :
>  Caused by: org.apache.kafka.common.errors.TimeoutException: 
> Call(callName=describeConfigs, deadlineMs=1616147081029) timed out at 
> 1616147081039 after 2 attempt(s)
>  Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled 
> describeConfigs request with correlation id 8 due to node 1 being disconnected
>   
> the DisconnectException is thrown because of the disconnect flag being set to 
> true in 
> [https://github.com/apache/kafka/blob/3d0b4d910b681df7d873c8a0285eaca01d6c173a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L352]
> While we _could_ have a different path from there 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L793]
>  that would propagate the fact that the connection timed out because of 
> {{[request.timeout.ms|http://request.timeout.ms/]}} expiration, and adjust 
> the later thrown exception   in there 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1195]
>   so that it's not a {{DisconnectException}} ?
>  
> Thank you
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12773) Use UncheckedIOException when wrapping IOException

2021-05-24 Thread loboxu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350274#comment-17350274
 ] 

loboxu commented on KAFKA-12773:


[~jagsancio]  Could you please tell me the name of your GitHub account? Please 
review the code for me.

[https://github.com/apache/kafka/pull/10749] 

> Use UncheckedIOException when wrapping IOException
> --
>
> Key: KAFKA-12773
> URL: https://issues.apache.org/jira/browse/KAFKA-12773
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: loboxu
>Priority: Major
>  Labels: kip-500, newbie
>
> The {{raft}} module may not be fully consistent on this but in general in 
> that module we have decided to not throw the checked {{IOException}}. We have 
> been avoiding checked {{IOException}} exceptions by wrapping them in 
> {{RuntimeException}}. The {{raft}} module should instead wrap {{IOException}} 
> in {{UncheckedIOException}}. This change should be limited to the {{raft}} 
> module.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] socutes opened a new pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-05-24 Thread GitBox


socutes opened a new pull request #10749:
URL: https://github.com/apache/kafka/pull/10749


   The raft module may not be fully consistent on this but in general in that 
module we have decided to not throw the checked IOException. We have been 
avoiding checked IOException exceptions by wrapping them in RuntimeException. 
The raft module should instead wrap IOException in UncheckedIOException. This 
change should be limited to the raft module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12534) kafka-configs does not work with ssl enabled kafka broker.

2021-05-24 Thread kaushik srinivas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350264#comment-17350264
 ] 

kaushik srinivas commented on KAFKA-12534:
--

Hi [~cricket007]

To update more on this, the test works fine when the certificate 
duration/expiry is extended and the same sequence of steps were performed to 
regenerate the keystore of the kafka broker and updated using kafka-configs.sh 
script.

The script fails only when the keystore password is being changed.

-Kaushik.

> kafka-configs does not work with ssl enabled kafka broker.
> --
>
> Key: KAFKA-12534
> URL: https://issues.apache.org/jira/browse/KAFKA-12534
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.1
>Reporter: kaushik srinivas
>Priority: Critical
>
> We are trying to change the trust store password on the fly using the 
> kafka-configs script for a ssl enabled kafka broker.
> Below is the command used:
> kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
> --entity-name 1001 --alter --add-config 'ssl.truststore.password=xxx'
> But we see below error in the broker logs when the command is run.
> {"type":"log", "host":"kf-2-0", "level":"INFO", 
> "neid":"kafka-cfd5ccf2af7f47868e83473408", "system":"kafka", 
> "time":"2021-03-23T12:14:40.055", "timezone":"UTC", 
> "log":\{"message":"data-plane-kafka-network-thread-1002-ListenerName(SSL)-SSL-2
>  - org.apache.kafka.common.network.Selector - [SocketServer brokerId=1002] 
> Failed authentication with /127.0.0.1 (SSL handshake failed)"}}
>  How can anyone configure ssl certs for the kafka-configs script and succeed 
> with the ssl handshake in this case ? 
> Note : 
> We are trying with a single listener i.e SSL: 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)