[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-04-08 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r610185971



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2393,6 +2407,10 @@ class Log(@volatile private var _dir: File,
   }
   // okay we are safe now, remove the swap suffix
   sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+
+  // If not recovered swap file we need to increment logStartOffset here. 
Otherwise, we do this when loading the log.
+  if (!isRecoveredSwapFile)

Review comment:
   Or rather, my test remembers. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12636) Ensure retention still enforced for compacted topics if cleaning is not enabled

2021-04-08 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12636:
---

 Summary: Ensure retention still enforced for compacted topics if 
cleaning is not enabled
 Key: KAFKA-12636
 URL: https://issues.apache.org/jira/browse/KAFKA-12636
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


We rely on a periodic task in LogManager to delete old segments of 
non-compacted topics which either have breached retention time or which have 
been explicitly deleted by a call to DeleteRecords. For compacted topics, we 
rely on the cleaning task itself to do the same since a compacted topic may 
also be configured with "delete" retention.

If log cleaning is not enabled, we still need to enforce retention semantics 
for compacted topics, but the current logic in LogManager excludes them from 
consideration:

{code}
// clean current logs.
val deletableLogs = {
  if (cleaner != null) {
// prevent cleaner from working on same partitions when changing 
cleanup policy
cleaner.pauseCleaningForNonCompactedPartitions()
  } else {
currentLogs.filter {
  case (_, log) => !log.config.compact
}
  }
}
{code}

It seems to me that we should remove the filtering when log cleaning is not 
enabled. The logic in `deleteOldSegments` will ensure that only the appropriate 
retention checks are made based on the topic configuration.

Of course it's kind of weird for a user to have a compacted topic when the 
cleaner is not enabled in the first place.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12502) Quorum controller should return topic configs in CreateTopic response

2021-04-08 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn reassigned KAFKA-12502:
-

Assignee: Ryan Dielhenn

> Quorum controller should return topic configs in CreateTopic response
> -
>
> Key: KAFKA-12502
> URL: https://issues.apache.org/jira/browse/KAFKA-12502
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Ryan Dielhenn
>Priority: Major
>  Labels: kip-500
>
> Configs were added to the response in version 5. 
> {code}
>   { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": 
> "5+", "nullableVersions": "5+", "ignorable": true,
> "about": "Configuration of the topic.", "fields": [
> { "name": "Name", "type": "string", "versions": "5+",
>   "about": "The configuration name." },
> { "name": "Value", "type": "string", "versions": "5+", 
> "nullableVersions": "5+",
>   "about": "The configuration value." },
> { "name": "ReadOnly", "type": "bool", "versions": "5+",
>   "about": "True if the configuration is read-only." },
> { "name": "ConfigSource", "type": "int8", "versions": "5+", 
> "default": "-1", "ignorable": true,
>   "about": "The configuration source." },
> { "name": "IsSensitive", "type": "bool", "versions": "5+",
>   "about": "True if this configuration is sensitive." }
>   ]}
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10462: KAFKA-10847: Fix spurious results on left/outer stream-stream joins

2021-04-08 Thread GitBox


guozhangwang commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r610209134



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -38,20 +48,36 @@
 private final String otherWindowName;
 private final long joinBeforeMs;
 private final long joinAfterMs;
+private final long joinGraceMs;
 
 private final ValueJoinerWithKey joiner;
 private final boolean outer;
+private final Optional outerJoinWindowName;
+private final boolean thisJoin;
+
+// Observed time is AtomicLong because this time is shared between the 
left and side processor nodes. However,
+// this time is not updated in parallel, so we can call get() several 
times without worry about getting different
+// times.
+private final AtomicLong maxObservedStreamTime;

Review comment:
   I personally was on the side of always using task stream time everywhere 
but more people feel that we should use processor stream time :P Anyways, all 
I'm trying to say is that we need to make an educated decision here, and if we 
concluded that either 1) we rely on task time here, but still use processor 
time on other expiration logic, or 2) we rely on processor time on all logic, 
or 3) we rely on task time on all logic, we have a good rationale for whichever 
we choose.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #10508: KAFKA-12633: Remove deprecated APIs in TopologyTestDriver

2021-04-08 Thread GitBox


guozhangwang opened a new pull request #10508:
URL: https://github.com/apache/kafka/pull/10508


   As well as related test classes.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10508: KAFKA-12633: Remove deprecated APIs in TopologyTestDriver

2021-04-08 Thread GitBox


guozhangwang commented on pull request #10508:
URL: https://github.com/apache/kafka/pull/10508#issuecomment-816328916


   ping @vvcephei for reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-08 Thread GitBox


hachikuji commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610235224



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(
+nextOffset,
+null,

Review comment:
   Why don't we use `Optional.empty`?

##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;

Review comment:
   It would be useful to factor out a `flush()` API. We may have additional 
use cases in the future.
   ```java
   public void flush() {
   appendLock.lock();
   try {
   drainStatus = DrainStatus.STARTED;
   maybeCompleteDrain();
   } finally {
   appendLock.unlock();
   }
   }
   ```

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1876,12 +1836,12 @@ private void appendBatch(
 }
 
 private long maybeAppendBatches(
-LeaderState state,
+LeaderState state,
 long currentTimeMs
 ) {
-long timeUnitFlush = accumulator.timeUntilDrain(currentTimeMs);
+long timeUnitFlush = state.accumulator().timeUntilDrain(currentTimeMs);

Review comment:
   While we're here, can we fix the name? It should be `timeUntilFlush`.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -,9 +2182,12 @@ public Long scheduleAtomicAppend(int epoch, List 
records) {
 return append(epoch, records, true);
 }
 
+@SuppressWarnings("unchecked")

Review comment:
   Seems this is not needed?

##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {

Review comment:
   Can we change this to `appendLeaderChangeMessage(LeaderChangeMessage)`? 
This would provide a stronger contract since it ensures that the batch is 
indeed a control batch, that its base offset is set consistently with 
`nextOffset`, and that it contains only one record as expected. It also would 
allow us to allocate the buffer used for the control batch from the 
`MemoryPool`. Then we wouldn't need to use the `null` values below when 
constructing the `CompletedBatch`, which avoids NPE potential.

##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(

Review comment:
   I think we are assuming here that `currentBatch` is null. Although that 
is guaranteed to be the case for this specific usage in `KafkaRaftClient`, we 
should try to give `BatchAccumulator` a stronger contract. To fix this, we can 
just call `maybeCompleteDrain` before doing anything else.
   
   By the way, we should also have unit tests for these scenarios.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1859,15 +1819,15 @@ private void appendBatch(
 offsetAndEpoch.offset + 1, Integer.MAX_VALUE);
 
 future.whenComplete((commitTimeMs, exception) -> {
-int numRecords = batch.records.size();
+int numRecords = batch.records.get().size();

Review comment:
   We should try to avoid blind calls to `get()`. I'm honestly not too sure 
how this even works for the case of a control batch. Maybe we are just ignoring 
the error?
   

##
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##
@@ -67,6 +75,39 @@ protected LeaderState(
 }
 this.grantingVoters.addAll(grantingVoters);
 this.log = logContext.logger(LeaderState.class);
+this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+}
+
+public BatchAccumulator accumulator() {
+return this.accumulator;
+}
+
+private static List convertToVoters(Set voterIds) {
+return voterIds.stream()
+.map(follower -> new Voter().setVot

[GitHub] [kafka] hachikuji commented on a change in pull request #10343: KAFKA-12471: Implement createPartitions in KIP-500 mode

2021-04-08 Thread GitBox


hachikuji commented on a change in pull request #10343:
URL: https://github.com/apache/kafka/pull/10343#discussion_r610263157



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1007,6 +999,128 @@ int bestLeader(int[] replicas, int[] isr, boolean 
unclean) {
 return ControllerResult.of(records, null);
 }
 
+ControllerResult>
+createPartitions(List topics) {
+List records = new ArrayList<>();
+List results = new ArrayList<>();
+for (CreatePartitionsTopic topic : topics) {
+ApiError apiError = ApiError.NONE;
+try {
+createPartitions(topic, records);
+} catch (ApiException e) {
+apiError = ApiError.fromThrowable(e);
+} catch (Exception e) {
+log.error("Unexpected createPartitions error for {}", topic, 
e);
+apiError = ApiError.fromThrowable(e);
+}
+results.add(new CreatePartitionsTopicResult().
+setName(topic.name()).
+setErrorCode(apiError.error().code()).
+setErrorMessage(apiError.message()));
+}
+return new ControllerResult<>(records, results, true);
+}
+
+void createPartitions(CreatePartitionsTopic topic,
+  List records) {
+Uuid topicId = topicsByName.get(topic.name());
+if (topicId == null) {
+throw new UnknownTopicOrPartitionException();
+}
+TopicControlInfo topicInfo = topics.get(topicId);
+if (topicInfo == null) {
+throw new UnknownTopicOrPartitionException();
+}
+if (topic.count() == topicInfo.parts.size()) {

Review comment:
   I guess this logic is consistent with the current implementation. It 
might have been nice to make this an idempotent operation.

##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -538,4 +542,65 @@ class ControllerApis(val requestChannel: RequestChannel,
 }
   })
   }
+
+  def handleCreatePartitions(request: RequestChannel.Request): Unit = {
+val future = createPartitions(request.body[CreatePartitionsRequest].data,
+  authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
names)(n => n))
+future.whenComplete((responses, exception) => {
+  if (exception != null) {
+requestHelper.handleError(request, exception)
+  } else {
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+  val responseData = new CreatePartitionsResponseData().
+setResults(responses).
+setThrottleTimeMs(requestThrottleMs)
+  new CreatePartitionsResponse(responseData)
+})
+  }
+})
+  }
+
+  def createPartitions(request: CreatePartitionsRequestData,
+   hasClusterAuth: Boolean,
+   getCreatableTopics: Iterable[String] => Set[String])
+   : 
CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
+val responses = new util.ArrayList[CreatePartitionsTopicResult]()
+val duplicateTopicNames = new util.HashSet[String]()
+val topicNames = new util.HashSet[String]()
+request.topics().forEach {
+  topic =>
+if (!topicNames.add(topic.name())) {
+  duplicateTopicNames.add(topic.name())
+}
+}
+duplicateTopicNames.forEach { topicName =>
+  responses.add(new CreatePartitionsTopicResult().
+setName(topicName).
+setErrorCode(INVALID_REQUEST.code()).
+setErrorMessage("Duplicate topic name."))
+topicNames.remove(topicName)
+}
+val authorizedTopicNames = {
+  if (hasClusterAuth) {
+topicNames.asScala
+  } else {
+getCreatableTopics(topicNames.asScala)
+  }
+}
+val topics = new util.ArrayList[CreatePartitionsTopic]
+topicNames.forEach {

Review comment:
   nit: `topicNames.forEach { topicName =>`
   

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1007,6 +999,128 @@ int bestLeader(int[] replicas, int[] isr, boolean 
unclean) {
 return ControllerResult.of(records, null);
 }
 
+ControllerResult>
+createPartitions(List topics) {
+List records = new ArrayList<>();
+List results = new ArrayList<>();
+for (CreatePartitionsTopic topic : topics) {
+ApiError apiError = ApiError.NONE;
+try {
+createPartitions(topic, records);
+} catch (ApiException e) {
+apiError = ApiError.fromThrowable(e);
+} catch (Exception e) {
+log.error("Unexpected createPartitions error for {}", topic, 
e);
+apiError = ApiError.fromThrowable(e

[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-08 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610269446



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(
+nextOffset,
+null,

Review comment:
   In constructor I used `Optional.ofNullable(records);`. I will change it 
so it reads easier.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-08 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610269446



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(
+nextOffset,
+null,

Review comment:
   In constructor I used `Optional.ofNullable(records);`. I will change it 
so it reads easier.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-08 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610280789



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(
+nextOffset,
+null,

Review comment:
   If `Optional.empty` is used, the type argument can not be inferred for 
CompletedBatch. I got around this by using `Optional.ofNullable` in the 
constructor of CompletedBatch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-08 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610281681



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(
+nextOffset,
+null,

Review comment:
   Using `Optional.ofNullable` saved the time of having to change every 
other time a CompletedBatch is constructed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-08 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610281681



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(
+nextOffset,
+null,

Review comment:
   Using `Optional.ofNullable` saved the time of having to change every 
other instantiation of CompletedBatch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #10480: KAFKA-12265: Move the BatchAccumulator in KafkaRaftClient to LeaderState

2021-04-08 Thread GitBox


dielhennr commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r610281681



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##
@@ -202,6 +203,25 @@ private void completeCurrentBatch() {
 currentBatch = null;
 }
 
+public void addControlBatch(MemoryRecords records) {
+appendLock.lock();
+try {
+drainStatus = DrainStatus.STARTED;
+completed.add(new CompletedBatch<>(
+nextOffset,
+null,

Review comment:
   Using `Optional.ofNullable` in the constructor saved the time of having 
to change every other instantiation of CompletedBatch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12464) Enhance constrained sticky Assign algorithm

2021-04-08 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12464:
--
Description: 
In KAFKA-9987, we did a great improvement for the case when all consumers were 
subscribed to same set of topics. The algorithm contains 4 phases:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 # Fill remaining members up to minQuota
 # If we ran out of unassigned partitions before filling all consumers, we need 
to start stealing partitions from the over-full consumers at max capacity
 # Otherwise we may have run out of unfilled consumers before assigning all 
partitions, in which case we should just distribute one partition each to all 
consumers at min capacity

 

Take an example for better understanding:

*example:*

Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... 
t1p9

Suppose, current assignment is:

_C0: t1p0, t1p1, t1p2, t1p3, t1p4_
 _C1: t1p5, t1p6, t1p7, t1p8, t1p9_

Now, new consumer added: C2, so we'll do:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 After this phase, the assignment will be: (maxQuota will be 4)

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7, t1p8_
 # Fill remaining members up to minQuota
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7, t1p8_
 _C2: t1p4, t1p9_
 # If we ran out of unassigned partitions before filling all consumers, we need 
to start stealing partitions from the over-full consumers at max capacity
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2_ 
 _C1: t1p5, t1p6, t1p7, t1p8_
 _C2: t1p4, t1p9,_ _t1p3_
 # Otherwise we may have run out of unfilled consumers before assigning all 
partitions, in which case we should just distribute one partition each to all 
consumers at min capacity

 

 

As we can see, we need 3 phases to complete the assignment. But we can actually 
completed with 2 phases. Here's the updated algorithm:
 # Reassign as many previously owned partitions as possible, up to the 
maxQuota, and also considering the numMaxQuota by the remainder of (Partitions 
/ Consumers)
 # Fill remaining members up to maxQuota if current maxQuotaMember < 
numMaxQuota, otherwise, to minQuota

 

By considering the numMaxQuota, the original step 1 won't be too aggressive to 
assign too many partitions to consumers, and the step 2 won't be too 
conservative to assign not enough partitions to consumers, so that we don't 
need step 3 and step 4 to balance them.

 

{{So, the updated Pseudo-code sketch of the algorithm:}}

C_f := (P/N)_floor, the floor capacity 
 C_c := (P/N)_ceil, the ceiling capacity

*C_r := (P%N) the allowed number of members with C_c partitions assigned*
 *num_max_capacity_members := current number of members with C_c partitions 
assigned (default to 0)*

members := the sorted set of all consumers
 partitions := the set of all partitions
 unassigned_partitions := the set of partitions not yet assigned, initialized 
to be all partitions
 unfilled_members := the set of consumers not yet at capacity, initialized to 
empty
 -max_capacity_members := the set of members with exactly C_c partitions 
assigned, initialized to empty-
 member.owned_partitions := the set of previously owned partitions encoded in 
the Subscription

// Reassign as many previously owned partitions as possible, *by considering 
the num_max_capacity_members*
 for member : members
  remove any partitions that are no longer in the subscription from its 
owned partitions
  remove all owned_partitions if the generation is old
  if member.owned_partitions.size < C_f
  assign all owned partitions to member and remove from 
unassigned_partitions
  add member to unfilled_members
  -else if member.owned_partitions.size == C_f-
  -assign first C_f owned_partitions to member and remove from 
unassigned_partitions-
  else if member.owned_partitions.size >= C_c *&& num_max_capacity_members 
< C_r*
  *assign first C_c owned_partitions to member and remove from 
unassigned_partitions*

 *num_max_capacity_members++*
  a-dd member to max_capacity_members-

 *else*
   *assign first C_f owned_partitions to member and remove from 
unassigned_partitions*

sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
t1_p0  (for data parallelism)
 sort unfilled_members by memberId (for determinism)

// Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f*
 for member : unfilled_members
  compute the remaining capacity as -C = C_f - num_assigned_partitions-

 if num_max_capacity_members < C_r:

    C = C_c - num_assigned_partitions

    num_max_capacity_members++
  else

   C = C_f - num_assigned_partitions
 pop the first C partitions from unassig

[jira] [Created] (KAFKA-12637) Remove deprecated PartitionAssignor interface

2021-04-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12637:
--

 Summary: Remove deprecated PartitionAssignor interface
 Key: KAFKA-12637
 URL: https://issues.apache.org/jira/browse/KAFKA-12637
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.0.0


In KIP-429, we deprecated the existing PartitionAssignor interface in order to 
move it out of the internals package and better align the name with other 
pluggable Consumer interfaces. We added an adapter to convert from existing 
o.a.k.clients.consumer.internals.PartitionAssignor to the new 
o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated 
interface. This was deprecated in 2.4, so we should be ok to remove it and the 
adaptor in 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12637) Remove deprecated PartitionAssignor interface

2021-04-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12637:
---
Labels: newbie newbie++  (was: )

> Remove deprecated PartitionAssignor interface
> -
>
> Key: KAFKA-12637
> URL: https://issues.apache.org/jira/browse/KAFKA-12637
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: newbie, newbie++
> Fix For: 3.0.0
>
>
> In KIP-429, we deprecated the existing PartitionAssignor interface in order 
> to move it out of the internals package and better align the name with other 
> pluggable Consumer interfaces. We added an adapter to convert from existing 
> o.a.k.clients.consumer.internals.PartitionAssignor to the new 
> o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated 
> interface. This was deprecated in 2.4, so we should be ok to remove it and 
> the adaptor in 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12637) Remove deprecated PartitionAssignor interface

2021-04-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12637:
---
Description: In KIP-429, we deprecated the existing PartitionAssignor 
interface in order to move it out of the internals package and better align the 
name with other pluggable Consumer interfaces. We added an adapter to convert 
from existing o.a.k.clients.consumer.internals.PartitionAssignor to the new 
o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated 
interface. This was deprecated in 2.4, so we should be ok to remove it and the 
PartitionAssignorAdaptor in 3.0  (was: In KIP-429, we deprecated the existing 
PartitionAssignor interface in order to move it out of the internals package 
and better align the name with other pluggable Consumer interfaces. We added an 
adapter to convert from existing 
o.a.k.clients.consumer.internals.PartitionAssignor to the new 
o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated 
interface. This was deprecated in 2.4, so we should be ok to remove it and the 
adaptor in 3.0)

> Remove deprecated PartitionAssignor interface
> -
>
> Key: KAFKA-12637
> URL: https://issues.apache.org/jira/browse/KAFKA-12637
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: newbie, newbie++
> Fix For: 3.0.0
>
>
> In KIP-429, we deprecated the existing PartitionAssignor interface in order 
> to move it out of the internals package and better align the name with other 
> pluggable Consumer interfaces. We added an adapter to convert from existing 
> o.a.k.clients.consumer.internals.PartitionAssignor to the new 
> o.a.k.clients.consumer.ConsumerPartitionAssignor and support the deprecated 
> interface. This was deprecated in 2.4, so we should be ok to remove it and 
> the PartitionAssignorAdaptor in 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-08 Thread GitBox


kowshik commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r610293058



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * This class represents the in-memory state of segments associated with a 
leader epoch. This includes the mapping of offset to
+ * segment ids and unreferenced segments which are not mapped to any offset 
but they exist in remote storage.
+ * 
+ * This is used by {@link RemoteLogMetadataCache} to track the segments for 
each leader epoch.
+ */
+class RemoteLogLeaderEpochState {
+
+// It contains offset to segment ids mapping with the segment state as 
COPY_SEGMENT_FINISHED.
+private final NavigableMap offsetToId = new 
ConcurrentSkipListMap<>();
+
+/**
+ * It represents unreferenced segments for this leader epoch. It contains 
the segments still in COPY_SEGMENT_STARTED
+ * and DELETE_SEGMENT_STARTED state or these have been replaced by callers 
with other segments having the same
+ * start offset for the leader epoch. These will be returned by {@link 
RemoteLogMetadataCache#listAllRemoteLogSegments()}
+ * and {@link RemoteLogMetadataCache#listRemoteLogSegments(int 
leaderEpoch)} so that callers can clean them up if
+ * they still exist. These will be cleaned from the cache once they reach 
DELETE_SEGMENT_FINISHED state.
+ */
+private final Set unreferencedSegmentIds = 
ConcurrentHashMap.newKeySet();
+
+// It represents the highest log offset of the segments that were updated 
with updateHighestLogOffset.
+private volatile Long highestLogOffset;
+
+/**
+ * Returns all the segments associated with this leader epoch sorted by 
start offset in ascending order.
+ *
+ * @param idToSegmentMetadata mapping of id to segment metadata. This will 
be used to get RemoteLogSegmentMetadata
+ *for an id to be used for sorting.
+ */
+Iterator 
listAllRemoteLogSegments(Map 
idToSegmentMetadata) {
+// Return all the segments including unreferenced metadata.
+int size = offsetToId.size() + unreferencedSegmentIds.size();
+if (size == 0) {
+return Collections.emptyIterator();
+}
+
+ArrayList metadataList = new 
ArrayList<>(size);
+for (RemoteLogSegmentId id : offsetToId.values()) {
+metadataList.add(idToSegmentMetadata.get(id));

Review comment:
   Hmm here we assume that `id` should be present in the provided 
`idToSegmentMetadata`. Due to programming error, or other reasons, the caller 
may not be able to ensure this. Would it be safer if we instead threw whenever 
`id` is absent in `idToSegmentMetadata`  to catch that case?

##
File path: clients/src/test/java/org/apache/kafka/test/TestUtils.java
##
@@ -535,4 +536,46 @@ public static void setFieldValue(Object obj, String 
fieldName, Object value) thr
 field.setAccessible(true);
 field.set(obj, value);
 }
+
+/**
+ * Returns true if both iterators have same elements in the same order.
+ *
+ * @param iterator1 first iterator.
+ * @param iterator2 second iterator.
+ * @paramtype of element in the iterators.
+ */
+public static  boolean sameElementsWithOrder(Iterator iterator1,

Review comment:
   Here is a slightly simpler version:
   ```
while (iterator1.hasNext() && iterator2.hasNext()) {
if (!Objects.equals(iterator1.next(), iterator2.next())) {
   return false;
   }
   }
   
   return !iterator1.hasNext

[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-08 Thread GitBox


kowshik commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r610305248



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicates it is not yet copied successfully. So, 
these segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ * 
+ * Segment in this state indicates it is successfully copied and it is 
available for reads. So, these segments
+ * will be accessible for reads. But this should be available for any cleanup 
activity like deleting segments by the
+ * caller of this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}:
+ * Segment in this state indicates it is getting deleted. That means, it is 
not available for reads. But it should be
+ * available for any cleanup activity like deleting segments by the caller of 
this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}:
+ * Segment in this state indicate it is already deleted. That means, it is not 
available for any activity including
+ * reads or cleanup activity. This cache will clear entries containing this 
state.
+ * 
+ * 
+ *
+ * 
+ *  The below table summarizes whether the segment with the respective state 
are available for the given methods.
+ * 
+ * 
+-+--++-+-+
+ * |  Method / SegmentState  | COPY_SEGMENT_STARTED | 
COPY_SEGMENT_FINISHED  | DELETE_SEGMENT_STARTED  | DELETE_SEGMENT_STARTED  |
+ * 
|-+--++-+-|
+ * | remoteLogSegmentMetadata|No|   Yes
  |  No |   No|
+ * | (int leaderEpoch, long offset)  |  |  
  | | |
+ * 
|-+--++-+-|
+ * | listRemoteLogSegments   |Yes   |   Yes
  |  Yes|   No|
+ * | (int leaderEpoch)   |  

[GitHub] [kafka] kowshik commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-04-08 Thread GitBox


kowshik commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r610305248



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * This class provides an in-memory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to leader epochs.
+ * 
+ * Remote log segment can go through the state transitions as mentioned in 
{@link RemoteLogSegmentState}.
+ * 
+ * This class will have all the segments which did not reach terminal state 
viz DELETE_SEGMENT_FINISHED. That means,any
+ * segment reaching the terminal state will get cleared from this instance.
+ * This class provides different methods to fetch segment metadata like {@link 
#remoteLogSegmentMetadata(int, long)},
+ * {@link #highestOffsetForEpoch(int)}, {@link #listRemoteLogSegments(int)}, 
{@link #listAllRemoteLogSegments()}. Those
+ * methods have different semantics to fetch the segment based on its state.
+ * 
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}:
+ * 
+ * Segment in this state indicates it is not yet copied successfully. So, 
these segments will not be
+ * accessible for reads but these are considered for cleanups when a partition 
is deleted.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}:
+ * 
+ * Segment in this state indicates it is successfully copied and it is 
available for reads. So, these segments
+ * will be accessible for reads. But this should be available for any cleanup 
activity like deleting segments by the
+ * caller of this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}:
+ * Segment in this state indicates it is getting deleted. That means, it is 
not available for reads. But it should be
+ * available for any cleanup activity like deleting segments by the caller of 
this class.
+ * 
+ * 
+ * {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}:
+ * Segment in this state indicate it is already deleted. That means, it is not 
available for any activity including
+ * reads or cleanup activity. This cache will clear entries containing this 
state.
+ * 
+ * 
+ *
+ * 
+ *  The below table summarizes whether the segment with the respective state 
are available for the given methods.
+ * 
+ * 
+-+--++-+-+
+ * |  Method / SegmentState  | COPY_SEGMENT_STARTED | 
COPY_SEGMENT_FINISHED  | DELETE_SEGMENT_STARTED  | DELETE_SEGMENT_STARTED  |
+ * 
|-+--++-+-|
+ * | remoteLogSegmentMetadata|No|   Yes
  |  No |   No|
+ * | (int leaderEpoch, long offset)  |  |  
  | | |
+ * 
|-+--++-+-|
+ * | listRemoteLogSegments   |Yes   |   Yes
  |  Yes|   No|
+ * | (int leaderEpoch)   |  

[jira] [Created] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost

2021-04-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12638:
--

 Summary: Remove default implementation of 
ConsumerRebalanceListener#onPartitionsLost
 Key: KAFKA-12638
 URL: https://issues.apache.org/jira/browse/KAFKA-12638
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: A. Sophie Blee-Goldman


When we added the #onPartitionsLost callback to the ConsumerRebalanceListener 
in KIP-429, we gave it a default implementation that just invoked the existing 
#onPartitionsRevoked method for backwards compatibility. This is somewhat 
inconvenient, since we generally want to invoke #onPartitionsLost in order to 
skip the committing of offsets on revoked partitions, which is exactly what 
#onPartitionsRevoked does.

I don't think we can just remove it in 3.0 since we haven't indicated that we 
"deprecated" the default implementation or logged a warning that we intend to 
remove the default in a future release (as we did for the 
RocksDBConfigSetter#close method in Streams, for example). We should try to add 
such a warning now, so we can remove it in a future release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #10488: MINOR: Remove some unnecessary cyclomatic complexity suppressions

2021-04-08 Thread GitBox


dengziming commented on pull request #10488:
URL: https://github.com/apache/kafka/pull/10488#issuecomment-816368204


   @chia7712 , Hello, PTAL, Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost

2021-04-08 Thread Ben Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317601#comment-17317601
 ] 

Ben Chen commented on KAFKA-12638:
--

Just curious. For such issue tagged with Majority, who can work on it? 
Especially there's some time constraints. Also do we have some mechanism to 
earn credits so that someone with enough "credits" can work on certain things?

> Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
> ---
>
> Key: KAFKA-12638
> URL: https://issues.apache.org/jira/browse/KAFKA-12638
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> When we added the #onPartitionsLost callback to the ConsumerRebalanceListener 
> in KIP-429, we gave it a default implementation that just invoked the 
> existing #onPartitionsRevoked method for backwards compatibility. This is 
> somewhat inconvenient, since we generally want to invoke #onPartitionsLost in 
> order to skip the committing of offsets on revoked partitions, which is 
> exactly what #onPartitionsRevoked does.
> I don't think we can just remove it in 3.0 since we haven't indicated that we 
> "deprecated" the default implementation or logged a warning that we intend to 
> remove the default in a future release (as we did for the 
> RocksDBConfigSetter#close method in Streams, for example). We should try to 
> add such a warning now, so we can remove it in a future release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown

2021-04-08 Thread GitBox


C0urante commented on a change in pull request #10503:
URL: https://github.com/apache/kafka/pull/10503#discussion_r610316462



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##
@@ -185,8 +185,12 @@ private void doRun() throws InterruptedException {
 
 execute();
 } catch (Throwable t) {
-log.error("{} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted", this, t);
-throw t;
+if (!stopping && !cancelled) {

Review comment:
   I was thinking we could log different messages based on whether only 
`stopping` or both `stopping` and `cancelled` were true. If `cancelled` is 
true, we should make sure to let people know that there might be a newer 
instance of this task already running, and that the log message isn't 
indicative that that newer instance has failed. If only `stopping` is true, 
then the existing log message should suffice. Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator

2021-04-08 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317613#comment-17317613
 ] 

Sagar Rao commented on KAFKA-8295:
--

Thanks Sophie for the background. I do plan to run some benchmarks by extending 
the rocksjava benchmarks for my use case. I can probably share the performance 
numbers here which might help us to decide if it should be considered or not.

Yeah, this merge operator mayn't be applicable for most other state stores so 
if it comes to a KIP for this, I would look at 617- which I have seen before 
while implementing KIP-614. Thanks for sharing that!

> Optimize count() using RocksDB merge operator
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown

2021-04-08 Thread GitBox


kpatelatwork commented on a change in pull request #10503:
URL: https://github.com/apache/kafka/pull/10503#discussion_r610322098



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##
@@ -185,8 +185,12 @@ private void doRun() throws InterruptedException {
 
 execute();
 } catch (Throwable t) {
-log.error("{} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted", this, t);
-throw t;
+if (!stopping && !cancelled) {

Review comment:
   are these flags internal or can we just add "stopped={}, cancelled={} " 
to the message instead of adding more if/else to the code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown

2021-04-08 Thread GitBox


kpatelatwork commented on a change in pull request #10503:
URL: https://github.com/apache/kafka/pull/10503#discussion_r610322098



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##
@@ -185,8 +185,12 @@ private void doRun() throws InterruptedException {
 
 execute();
 } catch (Throwable t) {
-log.error("{} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted", this, t);
-throw t;
+if (!stopping && !cancelled) {

Review comment:
   @C0urante are these flags meant to be hidden or wdyt if we just add 
"stopped={}, cancelled={} " to the message instead of adding more if/else to 
the code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-08 Thread GitBox


showuon opened a new pull request #10509:
URL: https://github.com/apache/kafka/pull/10509


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12464) Enhance constrained sticky Assign algorithm

2021-04-08 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317615#comment-17317615
 ] 

Luke Chen commented on KAFKA-12464:
---

[~ableegoldman], good news, after my enhancement, the 
_testLargeAssignmentAndGroupWithUniformSubscription_ test time down from 28xx 
ms, to 18xx ms. Improved 33% of performance. PR is submitted. Thank you.

> Enhance constrained sticky Assign algorithm
> ---
>
> Key: KAFKA-12464
> URL: https://issues.apache.org/jira/browse/KAFKA-12464
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 2.7.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>  Labels: perfomance
>
> In KAFKA-9987, we did a great improvement for the case when all consumers 
> were subscribed to same set of topics. The algorithm contains 4 phases:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  # Fill remaining members up to minQuota
>  # If we ran out of unassigned partitions before filling all consumers, we 
> need to start stealing partitions from the over-full consumers at max capacity
>  # Otherwise we may have run out of unfilled consumers before assigning all 
> partitions, in which case we should just distribute one partition each to all 
> consumers at min capacity
>  
> Take an example for better understanding:
> *example:*
> Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, 
> ... t1p9
> Suppose, current assignment is:
> _C0: t1p0, t1p1, t1p2, t1p3, t1p4_
>  _C1: t1p5, t1p6, t1p7, t1p8, t1p9_
> Now, new consumer added: C2, so we'll do:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  After this phase, the assignment will be: (maxQuota will be 4)
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  # Fill remaining members up to minQuota
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  _C2: t1p4, t1p9_
>  # If we ran out of unassigned partitions before filling all consumers, we 
> need to start stealing partitions from the over-full consumers at max capacity
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2_ 
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  _C2: t1p4, t1p9,_ _t1p3_
>  # Otherwise we may have run out of unfilled consumers before assigning all 
> partitions, in which case we should just distribute one partition each to all 
> consumers at min capacity
>  
>  
> As we can see, we need 3 phases to complete the assignment. But we can 
> actually completed with 2 phases. Here's the updated algorithm:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota, and also considering the numMaxQuota by the remainder of 
> (Partitions / Consumers)
>  # Fill remaining members up to maxQuota if current maxQuotaMember < 
> numMaxQuota, otherwise, to minQuota
>  
> By considering the numMaxQuota, the original step 1 won't be too aggressive 
> to assign too many partitions to consumers, and the step 2 won't be too 
> conservative to assign not enough partitions to consumers, so that we don't 
> need step 3 and step 4 to balance them.
>  
> {{So, the updated Pseudo-code sketch of the algorithm:}}
> C_f := (P/N)_floor, the floor capacity 
>  C_c := (P/N)_ceil, the ceiling capacity
> *C_r := (P%N) the allowed number of members with C_c partitions assigned*
>  *num_max_capacity_members := current number of members with C_c partitions 
> assigned (default to 0)*
> members := the sorted set of all consumers
>  partitions := the set of all partitions
>  unassigned_partitions := the set of partitions not yet assigned, initialized 
> to be all partitions
>  unfilled_members := the set of consumers not yet at capacity, initialized to 
> empty
>  -max_capacity_members := the set of members with exactly C_c partitions 
> assigned, initialized to empty-
>  member.owned_partitions := the set of previously owned partitions encoded in 
> the Subscription
> // Reassign as many previously owned partitions as possible, *by considering 
> the num_max_capacity_members*
>  for member : members
>   remove any partitions that are no longer in the subscription from its 
> owned partitions
>   remove all owned_partitions if the generation is old
>   if member.owned_partitions.size < C_f
>   assign all owned partitions to member and remove from 
> unassigned_partitions
>   add member to unfilled_members
>   -else if member.owned_partitions.size == C_f-
>   -assign first C_f owned_partitions to member and remove from 
> unassigned_partitions-
>   else if member.owned_partitions.size >= C_c *&& 
> num_max_capacity_members < C_r*
>   *assign first C_c owned_partiti

[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-08 Thread GitBox


showuon commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-816379867


   @ableegoldman , please help review this PR. The 
testLargeAssignmentAndGroupWithUniformSubscription test time down from 28xx ms, 
to 18xx ms. Improved 33% of performance. Yeah~
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-08 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r610324204



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,117 +160,95 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
+log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+partitionsPerTopic, consumerToOwnedPartitions));
+
 SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
 int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
 int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = unassignedPartitions.size() % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;

Review comment:
   we should make the capacity to maxQuota to avoid memory reallocation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-08 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r610325577



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,117 +160,95 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
+log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+partitionsPerTopic, consumerToOwnedPartitions));
+
 SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();

Review comment:
   We don't need to keep the `maxCapacityMembers`/`minCapacityMembers` 
anymore because we can precisely know how many members can have max capacity 
now, by this
   ```
   int numExpectedMaxCapacityMembers = unassignedPartitions.size() % 
numberOfConsumers;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-08 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r610325987



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,117 +160,95 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
+log.debug(String.format("performing constrained assign. 
partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
+partitionsPerTopic, consumerToOwnedPartitions));
+
 SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
 int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
 int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = unassignedPartitions.size() % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : ownedPartitions) {
-if (i < maxQuota) {
-consumerAssignment.add(tp);
-unassignedPartitions.remove(tp);
-} else {
-allRevokedPartitions.add(tp);
-}
-++i;
-}
 
 if (ownedPartitions.size() < minQuota) {
+// the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+// and put this member into unfilled member list
+consumerAssignment.addAll(ownedPartitions);
+unassignedPartitions.removeAll(ownedPartitions);
 unfilledMembers.add(consumer);
+} else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
+// consumer owned the "maxQuota" of partitions or more, and we 
still under the number of expected max capacity members
+// so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+consumerAssignment.addAll(ownedPartitions.subList(0, 
maxQuota));
+unassignedPartitions.removeAll(ownedPartitions.subList(0, 
maxQuota));
+allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
 } else {
-// It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-if (consumerAssignment.size() == minQuota)
-minCapacityMembers.add(consumer);
-if (consumerAssignment.size() == maxQuota)
-maxCapacityMembers.add(consumer);
+// consumer owned the "minQuota" of partitions or more
+// so keep "minQuota" of the owned partitions, and revoke the 
rest of the partitions
+consumerAssignment.addAll(ownedPartitions.subList(0, 
minQuota));
+unassignedPartitions.removeAll(ownedPartitions.subList(0, 
minQuota));
+allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));

[GitHub] [kafka] ableegoldman commented on pull request #10486: KAFKA-12492: Fix the formatting of example RocksDBConfigSetter

2021-04-08 Thread GitBox


ableegoldman commented on pull request #10486:
URL: https://github.com/apache/kafka/pull/10486#issuecomment-816382826


   Just some unrelated test failures in `kafka.server.RaftClusterTest`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-04-08 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r610326449



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
##
@@ -215,8 +215,8 @@ public void testTwoConsumersTwoTopicsSixPartitions() {
 subscriptions.put(consumer2, new Subscription(topics(topic1, topic2)));
 
 Map> assignment = 
assignor.assign(partitionsPerTopic, subscriptions);
-assertEquals(partitions(tp(topic1, 0), tp(topic1, 2), tp(topic2, 1)), 
assignment.get(consumer1));
-assertEquals(partitions(tp(topic1, 1), tp(topic2, 0), tp(topic2, 2)), 
assignment.get(consumer2));
+assertEquals(partitions(tp(topic1, 0), tp(topic1, 1), tp(topic1, 2)), 
assignment.get(consumer1));
+assertEquals(partitions(tp(topic2, 0), tp(topic2, 1), tp(topic2, 2)), 
assignment.get(consumer2));

Review comment:
   I don't do round-robin assign partitions in step 2 now, but I think this 
should not a big deal, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10486: KAFKA-12492: Fix the formatting of example RocksDBConfigSetter

2021-04-08 Thread GitBox


ableegoldman merged pull request #10486:
URL: https://github.com/apache/kafka/pull/10486


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12492:
---
Fix Version/s: 3.0.0

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
> Fix For: 3.0.0
>
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12634) Should checkpoint after restore finished

2021-04-08 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317632#comment-17317632
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12634:


I don't think bulk loading should affect the checkpoint, the data up to an 
offset is either in the state store or it isn't. I'd be fine with just doing a 
small KIP to fix in 3.0+ though

> Should checkpoint after restore finished
> 
>
> Key: KAFKA-12634
> URL: https://issues.apache.org/jira/browse/KAFKA-12634
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> For state stores, Kafka Streams maintains local checkpoint files to track the 
> offsets of the state store changelog topics. The checkpoint is updated on 
> commit or when a task is closed cleanly.
> However, after a successful restore, the checkpoint is not written. Thus, if 
> an instance crashes after restore but before committing, even if the state is 
> on local disk the checkpoint file is missing (indicating that there is no 
> state) and thus state would be restored from scratch.
> While for most cases, the time between restore end and next commit is small, 
> there are cases when this time could be large, for example if there is no new 
> input data to be processed (if there is no input data, the commit would be 
> skipped).
> Thus, we should write the checkpoint file after a successful restore to close 
> this gap (or course, only for at-least-once processing).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on a change in pull request #10503: KAFKA-9988: Suppress uncaught exceptions in log messages during task shutdown

2021-04-08 Thread GitBox


C0urante commented on a change in pull request #10503:
URL: https://github.com/apache/kafka/pull/10503#discussion_r610330282



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
##
@@ -185,8 +185,12 @@ private void doRun() throws InterruptedException {
 
 execute();
 } catch (Throwable t) {
-log.error("{} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted", this, t);
-throw t;
+if (!stopping && !cancelled) {

Review comment:
   Hmm... I don't believe "cancelled" is a term we've used in public-facing 
surfaces in the past. For example, when a task takes too long to shut down now 
and we have to cancel it, we log the message that "Graceful stop... failed": 
https://github.com/apache/kafka/blob/5964401bf9aab611bd4a072941bd1c927e044258/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L866
   
   Personally I think the additional code complexity is worth it; the original 
ticket mentions a case where these messages confuse users because they're 
generated for cancelled tasks, so I'd rather err on the side of making things 
as obvious as possible to them.
   
   It might be possible to keep things simple and eliminate branches by 
tweaking the message to make it clear that newer task instances won't be 
impacted by this failure, though. A possible downside to this is that it might 
be confusing if there are no newer instances that will be brought up on the 
worker (because the connector has been deleted, the number of tasks has been 
reduced, or the task has been reassigned to another worker). But with some 
careful wording we might be able to avoid misleading people into thinking that 
this message implies there's already another instance running.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-08 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317629#comment-17317629
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12492:


Merged, thanks for the PR! Unfortunately we just barely missed the 2.8 release, 
as John cut the RC earlier today. If you want to see this fix in the 2.8 docs 
then you'll need to submit this exact PR against the kafka-site repo as we 
discussed, but the 2.8 RC is still under vote at the moment so you'd need to 
wait for that to be released at which point the docs in kafka/2.8 will be 
copied over to a new 28 subdirectory in kafka-site. 

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
> Fix For: 3.0.0
>
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost

2021-04-08 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317635#comment-17317635
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12638:


Nope, no such things as credits in Kafka -- technically anyone can pick up 
anything, and the major deciding factor is just your own confidence and 
familiarity with Kafka. If you're relatively new to Kafka and pick up something 
large that you need a lot of help with, you might struggle to get it done just 
because everyone who works on this is always busy (not because they don't want 
to help -- they do). We try to label things with newbie and/or newbie++ to 
indicate good entry-level tickets. 

That said, it never hurts to ask before picking something up -- often if it's a 
blocker ticket, or maybe critical, then it's likely that the person who 
reported it or someone they know already plan to work on it. But you can always 
ask -- and "Major" is the default priority which many tickets are just left at, 
so don't let that stop you.

> Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
> ---
>
> Key: KAFKA-12638
> URL: https://issues.apache.org/jira/browse/KAFKA-12638
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> When we added the #onPartitionsLost callback to the ConsumerRebalanceListener 
> in KIP-429, we gave it a default implementation that just invoked the 
> existing #onPartitionsRevoked method for backwards compatibility. This is 
> somewhat inconvenient, since we generally want to invoke #onPartitionsLost in 
> order to skip the committing of offsets on revoked partitions, which is 
> exactly what #onPartitionsRevoked does.
> I don't think we can just remove it in 3.0 since we haven't indicated that we 
> "deprecated" the default implementation or logged a warning that we intend to 
> remove the default in a future release (as we did for the 
> RocksDBConfigSetter#close method in Streams, for example). We should try to 
> add such a warning now, so we can remove it in a future release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost

2021-04-08 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317638#comment-17317638
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12638:


If you're interested in this ticket, we can't do the whole thing because of the 
compatibility concerns I mentioned but feel free to pick up the first part, and 
just log a warning if the user has not implemented the #onPartitionsLost 
callback. Something like this: 
https://github.com/apache/kafka/blob/2.8/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java#L61

> Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
> ---
>
> Key: KAFKA-12638
> URL: https://issues.apache.org/jira/browse/KAFKA-12638
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> When we added the #onPartitionsLost callback to the ConsumerRebalanceListener 
> in KIP-429, we gave it a default implementation that just invoked the 
> existing #onPartitionsRevoked method for backwards compatibility. This is 
> somewhat inconvenient, since we generally want to invoke #onPartitionsLost in 
> order to skip the committing of offsets on revoked partitions, which is 
> exactly what #onPartitionsRevoked does.
> I don't think we can just remove it in 3.0 since we haven't indicated that we 
> "deprecated" the default implementation or logged a warning that we intend to 
> remove the default in a future release (as we did for the 
> RocksDBConfigSetter#close method in Streams, for example). We should try to 
> add such a warning now, so we can remove it in a future release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor

2021-04-08 Thread GitBox


C0urante commented on pull request #10315:
URL: https://github.com/apache/kafka/pull/10315#issuecomment-816388437


   Closing as this will likely be accomplished by KIP-726; can reopen if 
necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante closed pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor

2021-04-08 Thread GitBox


C0urante closed pull request #10315:
URL: https://github.com/apache/kafka/pull/10315


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12453) Guidance on whether a topology is eligible for optimisation

2021-04-08 Thread Patrick O'Keeffe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317651#comment-17317651
 ] 

Patrick O'Keeffe commented on KAFKA-12453:
--

Thanks [~mjsax], I'm happy to raise a PR to update the docs. Just a couple of 
questions:
 # Why does the input topic need to be configured with log compaction?
 # I was going to update the section on optimisation in "config-streams.html" 
and the javadoc for StreamsBuilder.table  - do any other docs spring to mind?

> Guidance on whether a topology is eligible for optimisation
> ---
>
> Key: KAFKA-12453
> URL: https://issues.apache.org/jira/browse/KAFKA-12453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Patrick O'Keeffe
>Priority: Major
>
> Since the introduction of KStream.toTable() in Kafka 2.6.x, the decision 
> about whether a topology is eligible for optimisation is no longer a simple 
> one, and is related to whether toTable() operations are preceded by key 
> changing operators.
> This decision requires expert level knowledge, and there are serious 
> implications associated with getting it wrong in terms of fault tolerance
> Some ideas spring to mind around how to guide developers to make the correct 
> decision:
>  # Topology.describe() could indicate whether this topology is eligible for 
> optimisation
>  # Topologies could be automatically optimised - note this may have an impact 
> at deployment time, in that an application reset may be required. The 
> developer would need to made aware of this and adjust the deployment plan 
> accordingly
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost

2021-04-08 Thread Ben Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chen reassigned KAFKA-12638:


Assignee: Ben Chen

> Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
> ---
>
> Key: KAFKA-12638
> URL: https://issues.apache.org/jira/browse/KAFKA-12638
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Major
>
> When we added the #onPartitionsLost callback to the ConsumerRebalanceListener 
> in KIP-429, we gave it a default implementation that just invoked the 
> existing #onPartitionsRevoked method for backwards compatibility. This is 
> somewhat inconvenient, since we generally want to invoke #onPartitionsLost in 
> order to skip the committing of offsets on revoked partitions, which is 
> exactly what #onPartitionsRevoked does.
> I don't think we can just remove it in 3.0 since we haven't indicated that we 
> "deprecated" the default implementation or logged a warning that we intend to 
> remove the default in a future release (as we did for the 
> RocksDBConfigSetter#close method in Streams, for example). We should try to 
> add such a warning now, so we can remove it in a future release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost

2021-04-08 Thread Ben Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317654#comment-17317654
 ] 

Ben Chen commented on KAFKA-12638:
--

Really appreciate it!

> Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
> ---
>
> Key: KAFKA-12638
> URL: https://issues.apache.org/jira/browse/KAFKA-12638
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Major
>
> When we added the #onPartitionsLost callback to the ConsumerRebalanceListener 
> in KIP-429, we gave it a default implementation that just invoked the 
> existing #onPartitionsRevoked method for backwards compatibility. This is 
> somewhat inconvenient, since we generally want to invoke #onPartitionsLost in 
> order to skip the committing of offsets on revoked partitions, which is 
> exactly what #onPartitionsRevoked does.
> I don't think we can just remove it in 3.0 since we haven't indicated that we 
> "deprecated" the default implementation or logged a warning that we intend to 
> remove the default in a future release (as we did for the 
> RocksDBConfigSetter#close method in Streams, for example). We should try to 
> add such a warning now, so we can remove it in a future release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12492) Formatting of example RocksDBConfigSetter is messed up

2021-04-08 Thread Ben Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317658#comment-17317658
 ] 

Ben Chen commented on KAFKA-12492:
--

Sounds good to me. I will do a follow-up PR.

> Formatting of example RocksDBConfigSetter is messed up
> --
>
> Key: KAFKA-12492
> URL: https://issues.apache.org/jira/browse/KAFKA-12492
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Trivial
>  Labels: docs, newbie
> Fix For: 3.0.0
>
>
> See the example implementation class CustomRocksDBConfig in the docs for the 
> rocksdb.config.setter
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#rocksdb-config-setter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12638) Remove default implementation of ConsumerRebalanceListener#onPartitionsLost

2021-04-08 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317670#comment-17317670
 ] 

dengziming commented on KAFKA-12638:


[~ben.c] Feel free to take as many issues as you can as long as you have enough 
time.

> Remove default implementation of ConsumerRebalanceListener#onPartitionsLost
> ---
>
> Key: KAFKA-12638
> URL: https://issues.apache.org/jira/browse/KAFKA-12638
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ben Chen
>Priority: Major
>
> When we added the #onPartitionsLost callback to the ConsumerRebalanceListener 
> in KIP-429, we gave it a default implementation that just invoked the 
> existing #onPartitionsRevoked method for backwards compatibility. This is 
> somewhat inconvenient, since we generally want to invoke #onPartitionsLost in 
> order to skip the committing of offsets on revoked partitions, which is 
> exactly what #onPartitionsRevoked does.
> I don't think we can just remove it in 3.0 since we haven't indicated that we 
> "deprecated" the default implementation or logged a warning that we intend to 
> remove the default in a future release (as we did for the 
> RocksDBConfigSetter#close method in Streams, for example). We should try to 
> add such a warning now, so we can remove it in a future release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming opened a new pull request #10510: KAFKA-12607: Test case for resigned state vote granting

2021-04-08 Thread GitBox


dengziming opened a new pull request #10510:
URL: https://github.com/apache/kafka/pull/10510


   *More detailed description of your change*
   As discussed in the Jira,  `ResignedState` will transition to `VotedState` 
and grant vote, just add some unit tests to verify this transition.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




<    1   2