[GitHub] [kafka] dajac opened a new pull request, #12317: MINOR: Fix AlterPartitionManager response handling

2022-06-20 Thread GitBox


dajac opened a new pull request, #12317:
URL: https://github.com/apache/kafka/pull/12317

   
https://github.com/apache/kafka/commit/f83d95d9a28267f7ef7a7b1e584dcdb4aa842210 
introduced topic ids in the AlterPartitionRequest/Response and we just found a 
bug in the request handling logic. The issue is the following.
   
   When the `AlterPartitionManager` receives the response, it builds the 
`partitionResponses` mapping `TopicIdPartition` to its result. 
`TopicIdPartition` is built from the response. Therefore if version < 2 is 
used, `TopicIdPartition` will have the `ZERO` topic id. Then the 
`AlterPartitionManager` iterates over the item sent to find their response. If 
an item has a topic id in its `TopicIdPartition` and version < 2 was used, it 
cannot find it because one has it and the other one has not.
   
   This patch fixes the issue by using `TopicPartition` as a key in the 
`partitionResponses` map. This ensures that the result can be found regardless 
of the topic is being set or not.
   
   Note that the case where version 2 is used is handled correctly because we 
already have logic to get back the topic name from the topic id in order to 
construct the `TopicPartition`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Justinwins commented on pull request #8656: KAFKA-9981; dedicated mm2 cluster lose the update operation.

2022-06-20 Thread GitBox


Justinwins commented on PR #8656:
URL: https://github.com/apache/kafka/pull/8656#issuecomment-1161331513

   > In my case follower doesn't update configuration because **changed** flag 
keeps **false** value. Neither task count has been changed nor task configs. 
https://github.com/savinov/kafka/blob/3.1.0-KAFKA-9981/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1571
 
https://github.com/savinov/kafka/blob/3.1.0-KAFKA-9981/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1577
   > 
   > I forget to mention: I want to distribute workload between two or more mm2 
instances without any topic/partition changes. This patch isn't enough to do 
that.
   
   @savinov @denglinfan how do you finally work around this issue ? I come into 
the same situation ,too.
   thanks a lot for replying


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] taehwa-song opened a new pull request, #12316: Include kafka record inside RecordDeserializationException

2022-06-20 Thread GitBox


taehwa-song opened a new pull request, #12316:
URL: https://github.com/apache/kafka/pull/12316

   I want to handle the record where serialize Exception occurred in 
errorHandler. Shouldn't the exception include the record?
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2022-06-20 Thread Justinwins (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justinwins updated KAFKA-9981:
--
Attachment: (was: image-2022-06-21-12-06-44-010.png)

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2022-06-20 Thread Justinwins (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556632#comment-17556632
 ] 

Justinwins commented on KAFKA-9981:
---

[~durban]  [~vaibhavjaimini] any workaround available for this issue ?  i met 
this in production env too, and got stuck for a few days. 

thanks.

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
> Attachments: image-2022-06-21-12-06-44-010.png
>
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2022-06-20 Thread Justinwins (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justinwins updated KAFKA-9981:
--
Attachment: image-2022-06-21-12-06-44-010.png

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
> Attachments: image-2022-06-21-12-06-44-010.png
>
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dengziming commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

2022-06-20 Thread GitBox


dengziming commented on PR #12274:
URL: https://github.com/apache/kafka/pull/12274#issuecomment-1161178784

   @hachikuji In this PR I tried your suggestion and it does solve this 
problem, however, this will make the logic in RaftClient very complex and we 
need to save more states in `LeaderState` and it's also difficult to test. I 
made another solution according to how we add a follow replica into ISR to 
check `followerEndOffset >= leaderLog.highWatermark`.
   
   here is the code change: 
https://github.com/dengziming/kafka/pull/new/KAFKA-13959-2, @showuon @hachikuji 
 WDYT?
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12309: KAFKA-14007: Invoking connect headers.close method on shutdown

2022-06-20 Thread GitBox


C0urante commented on code in PR #12309:
URL: https://github.com/apache/kafka/pull/12309#discussion_r902098848


##
connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java:
##
@@ -104,6 +104,7 @@ public SchemaAndValue toConnectHeader(String topic, String 
headerKey, byte[] val
 
 @Override
 public void close() {
-// do nothing
+this.serializer.close();
+this.deserializer.close();

Review Comment:
   Good catch here. Should we do the same in the 
[NumberConverter](https://github.com/apache/kafka/blob/30216ea1c58761e62f51af40033f24e3ae1c5c2a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/NumberConverter.java#L123-L125)
 class?



##
connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java:
##
@@ -104,6 +104,7 @@ public SchemaAndValue toConnectHeader(String topic, String 
headerKey, byte[] val
 
 @Override
 public void close() {
-// do nothing
+this.serializer.close();
+this.deserializer.close();

Review Comment:
   Also, even though these are no-ops right now, if we want to be meticulous 
about the cleanup logic here, should we wrap both of these in 
`Utils::closeQuietly`, so that if one fails, we still get a chance to close the 
other?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12295: KAFKA-13586: Prevent exception thrown during connector update from crashing distributed herder

2022-06-20 Thread GitBox


C0urante commented on PR #12295:
URL: https://github.com/apache/kafka/pull/12295#issuecomment-1161074753

   ### Hanging tests
   
   Phew, you got me worried there for a sec! I wrote the zombie fencing unit 
tests and we just merged them a week or two ago. I did some digging and it 
looks like the tests are hanging during 
`PowerMock.mockStatic(RestClient.class);`, which is due to a known issue with 
PowerMock that has an easy workaround: 
https://github.com/powermock/powermock/issues/610
   
   I was able to fix the tests locally by changing the class-level annotation 
from `@PrepareForTest({DistributedHerder.class, Plugins.class, 
RestClient.class})` to `@PrepareForTest({DistributedHerder.class, 
Plugins.class})`, and adding the annotation 
`@PrepareForTest({DistributedHerder.class, Plugins.class, RestClient.class})` 
to the `testTaskRequestedZombieFencingForwardedToLeader` and 
`testTaskRequestedZombieFencingFailedForwardToLeader` test methods. Can you 
give that a shot?
   
   I'm still not sure exactly why your new test affected these test cases. But, 
as long as your test works and the existing tests work, I'm fine with it.
   
   ### Invoking `onFailure`
   
   Regarding this question:
   > In `getConnectorStartingCallable`, immediate failures are passed to 
`onFailure`; is that something we’d have to account for?
   
   Great point--I do think that calling `onFailure` when exceptions are thrown 
is warranted; in this case, it'd cause us to mark the connector as `FAILED` 
instead of leaving it in whatever state it was in previously, which seems 
accurate. I wonder if we might actually leverage `getConnectorStartingCallable` 
in this method? At this point we're essentially duplicating its logic, and it's 
possible (though admittedly unlikely) that there would be multiple reconfigured 
connectors, in which case we'd be able to parallelize things and move faster. 
   
   I'm imagining something like this, LMKWYT:
   ```java
   private void processConnectorConfigUpdates(Set 
connectorConfigUpdates) {
   // ...
   Collection> connectorsToStart = new ArrayList<>(); // 
New
   for (String connectorName : connectorConfigUpdates) {
   // ...
   // The update may be a deletion, so verify we actually need to 
restart the connector
   if (remains) {
   
connectorsToStart.add(getConnectorStartingCallable(connectorName)); // Replaces 
existing if-statement body
   }
   }
   startAndStop(connectorsToStart); // New
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez commented on a diff in pull request #12314: KAFKA-13903: Queue size metric in QuorumController

2022-06-20 Thread GitBox


soarez commented on code in PR #12314:
URL: https://github.com/apache/kafka/pull/12314#discussion_r902057670


##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -166,6 +171,13 @@ private class EventHandler implements Runnable {
  * A condition variable for waking up the event handler thread.
  */
 private final Condition cond = lock.newCondition();
+private final Consumer onQueueSizeChange;
+
+private AtomicInteger size = new AtomicInteger(0);

Review Comment:
   There should never be that many events in the queue that can't fit in an 
int. Each event is a runnable, held in a linked list. Mind that 
`java.util.LinkedList` for example, uses `int` for size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez commented on a diff in pull request #12314: KAFKA-13903: Queue size metric in QuorumController

2022-06-20 Thread GitBox


soarez commented on code in PR #12314:
URL: https://github.com/apache/kafka/pull/12314#discussion_r902056110


##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -178,15 +190,18 @@ public void run() {
 }
 
 private void remove(EventContext eventContext) {
-eventContext.remove();
+boolean removed = eventContext.remove();
 if (eventContext.deadlineNs.isPresent()) {
-deadlineMap.remove(eventContext.deadlineNs.getAsLong());
+removed |= 
deadlineMap.remove(eventContext.deadlineNs.getAsLong()) != null;
 eventContext.deadlineNs = OptionalLong.empty();
 }
 if (eventContext.tag != null) {
 tagToEventContext.remove(eventContext.tag, eventContext);
 eventContext.tag = null;
 }
+if (removed) {
+onQueueSizeChange.accept(size.decrementAndGet());

Review Comment:
   The goal isn't to have an arbitrary consumer, we just need to propagate the 
changes to `ControllerMetrics`. I'm not sure what you mean with "an async 
explicit metric publish event", but I agree it makes senses that we should be 
careful while holding the lock. I think we can invert the logic and instead of 
pushing the size change to `ControllerMetrics`, we can use a Supplier instead 
to pull it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez commented on pull request #12314: KAFKA-13903: Queue size metric in QuorumController

2022-06-20 Thread GitBox


soarez commented on PR #12314:
URL: https://github.com/apache/kafka/pull/12314#issuecomment-1160920644

   Hi Divij, thanks for having a look at this. The metric [is 
documented](https://github.com/apache/kafka/blob/89c057397ec5ae320aee7e12e1a984b2c14a7fa5/docs/ops.html#L1424-L1428).
 The goal is to bring it to KRaft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #12271: KAFKA-13972; Ensure that replicas are stopped after cancelled reassignment

2022-06-20 Thread GitBox


junrao commented on code in PR #12271:
URL: https://github.com/apache/kafka/pull/12271#discussion_r901926241


##
core/src/main/scala/kafka/controller/KafkaController.scala:
##
@@ -776,33 +776,60 @@ class KafkaController(val config: KafkaConfig,
* will be encoded as [3, 4, 2, 1] while the reassignment is in progress. If 
the reassignment
* is cancelled, there is no way to restore the original order.
*
-   * @param topicPartition The reassigning partition
-   * @param reassignment The new reassignment
+   * @param topicPartition The topic partition
+   * @param newAssignment The new assignment
*/
-  private def updateCurrentReassignment(topicPartition: TopicPartition, 
reassignment: ReplicaAssignment): Unit = {
+  private def maybeUpdateCurrentAssignment(topicPartition: TopicPartition, 
newAssignment: ReplicaAssignment): Unit = {
 val currentAssignment = 
controllerContext.partitionFullReplicaAssignment(topicPartition)
+if (currentAssignment != newAssignment) {
+  if (currentAssignment.isBeingReassigned) {
+// Cancel the current reassignment by removing unneeded replicas from 
the ISR
+// and stopping/deleting them. Note that if the controller fails 
before updating
+// the assignment state in Zookeeper below, these replicas may get 
restarted after
+// controller fail-over. We expect the client would retry the 
cancellation in this case.
+cancelReassignment(topicPartition, currentAssignment, newAssignment)
+  }
 
-if (currentAssignment != reassignment) {
-  debug(s"Updating assignment of partition $topicPartition from 
$currentAssignment to $reassignment")
-
+  info(s"Updating assignment of partition $topicPartition from 
$currentAssignment to $newAssignment")
   // U1. Update assignment state in zookeeper
-  updateReplicaAssignmentForPartition(topicPartition, reassignment)
+  updateReplicaAssignmentForPartition(topicPartition, newAssignment)
   // U2. Update assignment state in memory
-  controllerContext.updatePartitionFullReplicaAssignment(topicPartition, 
reassignment)
-
-  // If there is a reassignment already in progress, then some of the 
currently adding replicas
-  // may be eligible for immediate removal, in which case we need to stop 
the replicas.
-  val unneededReplicas = 
currentAssignment.replicas.diff(reassignment.replicas)
-  if (unneededReplicas.nonEmpty)
-stopRemovedReplicasOfReassignedPartition(topicPartition, 
unneededReplicas)
+  controllerContext.updatePartitionFullReplicaAssignment(topicPartition, 
newAssignment)
 }
+  }
 
-if (!isAlterPartitionEnabled) {
-  val reassignIsrChangeHandler = new 
PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
-  zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
+  private def cancelReassignment(

Review Comment:
   Perhaps we could add a comment that the cancellation may fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12297: KAFKA-13846: Follow up PR to address review comments

2022-06-20 Thread GitBox


vamossagar12 commented on code in PR #12297:
URL: https://github.com/apache/kafka/pull/12297#discussion_r901884525


##
docs/upgrade.html:
##
@@ -19,6 +19,56 @@
 
 

[jira] [Assigned] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause NPE

2022-06-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14012:
-

Assignee: Sagar Rao

> passing a "method" into the `Utils.closeQuietly` method cause NPE
> -
>
> Key: KAFKA-14012
> URL: https://issues.apache.org/jira/browse/KAFKA-14012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Assignee: Sagar Rao
>Priority: Major
>
> Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But 
> there are some places we passed "method" into Utils.closeQuietly, which 
> causes the object doesn't get closed as expected. 
> I found it appeared in:
> - WorkerConnector
> - AbstractWorkerSourceTask
> - KafkaConfigBackingStore
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-14013) Limit the length of the `reason` field sent on the wire

2022-06-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-14013:
---

Assignee: David Jacot  (was: K. Ouafi)

> Limit the length of the `reason` field sent on the wire
> ---
>
> Key: KAFKA-14013
> URL: https://issues.apache.org/jira/browse/KAFKA-14013
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.2.0, 3.3.0
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
>
> KIP-800 added the `reason` field to the JoinGroupRequest and the 
> LeaveGroupRequest as I mean to provide more information to the group 
> coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we 
> discovered that the size of the field is limited to 32767 chars by our 
> serialization mechanism. At the moment, the field either provided directly by 
> the user or constructed internally is directly set regardless of its length.
> Given the purpose of this field, it seems acceptable to only sent the first 
> 255 chars on the wire. That would prevent us from hitting that limit again 
> while ensuring that the amount of bytes sent to the broker remain within an 
> acceptable range.
> We should apply this to the JoinGroupRequest and to the LeaveGroupRequest.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-14013) Limit the length of the `reason` field sent on the wire

2022-06-20 Thread K. Ouafi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

K. Ouafi reassigned KAFKA-14013:


Assignee: K. Ouafi  (was: David Jacot)

> Limit the length of the `reason` field sent on the wire
> ---
>
> Key: KAFKA-14013
> URL: https://issues.apache.org/jira/browse/KAFKA-14013
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.2.0, 3.3.0
>Reporter: David Jacot
>Assignee: K. Ouafi
>Priority: Blocker
>
> KIP-800 added the `reason` field to the JoinGroupRequest and the 
> LeaveGroupRequest as I mean to provide more information to the group 
> coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we 
> discovered that the size of the field is limited to 32767 chars by our 
> serialization mechanism. At the moment, the field either provided directly by 
> the user or constructed internally is directly set regardless of its length.
> Given the purpose of this field, it seems acceptable to only sent the first 
> 255 chars on the wire. That would prevent us from hitting that limit again 
> while ensuring that the amount of bytes sent to the broker remain within an 
> acceptable range.
> We should apply this to the JoinGroupRequest and to the LeaveGroupRequest.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] vamossagar12 commented on pull request #12309: KAFKA-14007: Invoking connect headers.close method on shutdown

2022-06-20 Thread GitBox


vamossagar12 commented on PR #12309:
URL: https://github.com/apache/kafka/pull/12309#issuecomment-1160690649

   @C0urante , I did run the unit tests before pushing the changes the first 
time around but instead of running connect:unitTests, ran it for some other 
sub-task. Sorry about that 
   I have made the relevant changes to the PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


jnh5y commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160686032

   > @jnh5y Yeah, you cannot rely on providing the error in the response in 
your case. Instead, you have to find a place where you can actually throw a 
custom exception. That could be with a custom assignor for instance.
   
   I'm still learning the Kafka codebase, is there an example of using a custom 
assignor that is easy to understand?  Also, should we go to the trouble to test 
this particular code path or is it sufficient to verify that the `reason` is 
limited when the function setting it is called?  (I can see either argument.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


dajac commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160680527

   > I'm not still seeing how to modify that particular test to pass in an 
arbitrary exception. That said, I'm happy with @dajac's solution to call 
requestRejoin. In the follow-up ticket, we could test that this function works 
as we expect.
   
   @jnh5y Yeah, you cannot rely on providing the error in the response in your 
case. Instead, you have to find a place where you can actually throw a custom 
exception. That could be with a custom assignor for instance.
   
   > If that ticket can wait a little bit, I'd like to find the time to do it. 
I can find time before the July 20th code freeze.
   
   As long as we can get it in before the code freeze, I am fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


jnh5y commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160666884

   @dajac Thanks for fixing the PR and merging it!
   
   If that ticket can wait a little bit, I'd like to find the time to do it.  I 
can find time before the July 20th code freeze.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dstelljes commented on pull request #12295: KAFKA-13586: Prevent exception thrown during connector update from crashing distributed herder

2022-06-20 Thread GitBox


dstelljes commented on PR #12295:
URL: https://github.com/apache/kafka/pull/12295#issuecomment-1160666379

   Hey @C0urante, thanks for the comments!
   
   I just added a variation on the update test for this, but it’s causing 
`testTaskRequestedZombieFencingForwardedToLeader` to hang locally. I’m still 
not sure what I messed up, but I’m pushing it up as is since the zombie fencing 
tests are new and you’ll probably be able to spot the problem much faster than 
I will.
   
   Ideally, I think `ConfigException` itself would be checked. That way, 
`ConfigProvider::get` implementations could throw `ConfigException` for invalid 
paths, missing keys, etc., and other runtime exceptions could be assumed to be 
a provider defect or some other worker-level issue. That ship has probably 
sailed though...
   
   It seems fair to wrap everything in `startConnector` and funnel every 
exception to the callback since `reconfigureConnector` does the same thing. In 
`getConnectorStartingCallable`, immediate failures are passed to `onFailure`; 
is that something we’d have to account for?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


jnh5y commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160665948

   > @jnh5y I do not completely understand what you mean by:
   > 
   > > I tried to find a way to create a unit test for this change, but I 
wasn't able to do so quickly.
   > 
   > I have the impression that the unit test coverage is sufficient in 
`AbstractCoordinator#testRejoinReason`.
   
   Hi @cadonna, good question!  At the point when I said that, I hadn't seen 
`AbstractCoordinator#testRejoinReason`.
   
   I was looking for a test which would call 
`AbstractCoordinator.joinGroupIfNeeded` and pass in an exception which was too 
large.  
   
   I'm not still seeing how to modify that particular test to pass in an 
arbitrary exception.  That said, I'm happy with @dajac's solution to call 
`requestRejoin`.  In the follow-up ticket, we could test that this function 
works as we expect.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite

2022-06-20 Thread GitBox


vamossagar12 commented on PR #12307:
URL: https://github.com/apache/kafka/pull/12307#issuecomment-1160663643

   LGTM!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite

2022-06-20 Thread GitBox


vamossagar12 commented on code in PR #12307:
URL: https://github.com/apache/kafka/pull/12307#discussion_r901856096


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:
##
@@ -142,11 +174,13 @@ public void testFailureIsFinalState() {
 ctx.close();
 expectLastCall();
 
-offsetStorageReader.close();
-expectLastCall();
+if (connectorType == ConnectorType.SOURCE) {
+offsetStorageReader.close();
+expectLastCall();
 
-offsetStore.stop();
-expectLastCall();
+offsetStore.stop();
+expectLastCall();
+}

Review Comment:
   hmm I did see the other repeated sections in the tests but since they are 
out of scope for the context of this PR, didn't bring them up. I guess we can 
live with these newer repetitions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13974) Rename `INVALID_UPDATE_VERSION` to `INVALID_PARTITION_EPOCH`

2022-06-20 Thread Kvicii.Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556458#comment-17556458
 ] 

Kvicii.Yu commented on KAFKA-13974:
---

ok, if have some simple issue, you can tag with it, I will do it.

> Rename `INVALID_UPDATE_VERSION` to `INVALID_PARTITION_EPOCH`
> 
>
> Key: KAFKA-13974
> URL: https://issues.apache.org/jira/browse/KAFKA-13974
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> Rename `INVALID_UPDATE_VERSION` to `INVALID_PARTITION_EPOCH`. 
> `INVALID_UPDATE_VERSION` is tight to ZK whereas `INVALID_PARTITION_EPOCH` is 
> more generic.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13974) Rename `INVALID_UPDATE_VERSION` to `INVALID_PARTITION_EPOCH`

2022-06-20 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556452#comment-17556452
 ] 

David Jacot commented on KAFKA-13974:
-

[~Kvicii] Yes. I plan to do it. I am actually not sure that it is possible 
because `InvalidUpdateVersionException` is also used on the client side.

> Rename `INVALID_UPDATE_VERSION` to `INVALID_PARTITION_EPOCH`
> 
>
> Key: KAFKA-13974
> URL: https://issues.apache.org/jira/browse/KAFKA-13974
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> Rename `INVALID_UPDATE_VERSION` to `INVALID_PARTITION_EPOCH`. 
> `INVALID_UPDATE_VERSION` is tight to ZK whereas `INVALID_PARTITION_EPOCH` is 
> more generic.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13974) Rename `INVALID_UPDATE_VERSION` to `INVALID_PARTITION_EPOCH`

2022-06-20 Thread Kvicii.Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556428#comment-17556428
 ] 

Kvicii.Yu commented on KAFKA-13974:
---

[~dajac]  hi, are you fixed this? if not, I will do this.

> Rename `INVALID_UPDATE_VERSION` to `INVALID_PARTITION_EPOCH`
> 
>
> Key: KAFKA-13974
> URL: https://issues.apache.org/jira/browse/KAFKA-13974
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> Rename `INVALID_UPDATE_VERSION` to `INVALID_PARTITION_EPOCH`. 
> `INVALID_UPDATE_VERSION` is tight to ZK whereas `INVALID_PARTITION_EPOCH` is 
> more generic.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] clolov commented on pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite

2022-06-20 Thread GitBox


clolov commented on PR #12307:
URL: https://github.com/apache/kafka/pull/12307#issuecomment-1160569301

   Got it, okay, looks good to me, thanks for the detailed explanation :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14013) Limit the length of the `reason` field sent on the wire

2022-06-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14013:

Affects Version/s: 3.2.0
   3.3.0

> Limit the length of the `reason` field sent on the wire
> ---
>
> Key: KAFKA-14013
> URL: https://issues.apache.org/jira/browse/KAFKA-14013
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.2.0, 3.3.0
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> KIP-800 added the `reason` field to the JoinGroupRequest and the 
> LeaveGroupRequest as I mean to provide more information to the group 
> coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we 
> discovered that the size of the field is limited to 32767 chars by our 
> serialization mechanism. At the moment, the field either provided directly by 
> the user or constructed internally is directly set regardless of its length.
> Given the purpose of this field, it seems acceptable to only sent the first 
> 255 chars on the wire. That would prevent us from hitting that limit again 
> while ensuring that the amount of bytes sent to the broker remain within an 
> acceptable range.
> We should apply this to the JoinGroupRequest and to the LeaveGroupRequest.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-14013) Limit the length of the `reason` field sent on the wire

2022-06-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14013:

Priority: Blocker  (was: Major)

> Limit the length of the `reason` field sent on the wire
> ---
>
> Key: KAFKA-14013
> URL: https://issues.apache.org/jira/browse/KAFKA-14013
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.2.0, 3.3.0
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
>
> KIP-800 added the `reason` field to the JoinGroupRequest and the 
> LeaveGroupRequest as I mean to provide more information to the group 
> coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we 
> discovered that the size of the field is limited to 32767 chars by our 
> serialization mechanism. At the moment, the field either provided directly by 
> the user or constructed internally is directly set regardless of its length.
> Given the purpose of this field, it seems acceptable to only sent the first 
> 255 chars on the wire. That would prevent us from hitting that limit again 
> while ensuring that the amount of bytes sent to the broker remain within an 
> acceptable range.
> We should apply this to the JoinGroupRequest and to the LeaveGroupRequest.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dajac closed pull request #12254: 3.2

2022-06-20 Thread GitBox


dajac closed pull request #12254: 3.2
URL: https://github.com/apache/kafka/pull/12254


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12254: 3.2

2022-06-20 Thread GitBox


dajac commented on PR #12254:
URL: https://github.com/apache/kafka/pull/12254#issuecomment-1160551923

   This PR tries to get 3.2 branch to trunk. As that seems to be a mistake, 
closing it for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


dajac commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160549735

   I filed a JIRA for the limit part: 
https://issues.apache.org/jira/browse/KAFKA-14013. @jnh5y I am happy to do this 
part if you don't have cycles for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14013) Limit the length of the `reason` field sent on the wire

2022-06-20 Thread David Jacot (Jira)
David Jacot created KAFKA-14013:
---

 Summary: Limit the length of the `reason` field sent on the wire
 Key: KAFKA-14013
 URL: https://issues.apache.org/jira/browse/KAFKA-14013
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


KIP-800 added the `reason` field to the JoinGroupRequest and the 
LeaveGroupRequest as I mean to provide more information to the group 
coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we 
discovered that the size of the field is limited to 32767 chars by our 
serialization mechanism. At the moment, the field either provided directly by 
the user or constructed internally is directly set regardless of its length.

Given the purpose of this field, it seems acceptable to only sent the first 255 
chars on the wire. That would prevent us from hitting that limit again while 
ensuring that the amount of bytes sent to the broker remain within an 
acceptable range.

We should apply this to the JoinGroupRequest and to the LeaveGroupRequest.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] C0urante commented on pull request #12307: KAFKA-14006: Parameterize WorkerConnectorTest suite

2022-06-20 Thread GitBox


C0urante commented on PR #12307:
URL: https://github.com/apache/kafka/pull/12307#issuecomment-1160549247

   @clolov I don't think it's necessary to add these tests to that list since 
they're still passing locally with Java versions 16-18 and on Jenkins with Java 
17. It's a little risky to add tests to that list, too, since it causes them to 
be skipped with Java versions >= 16.
   
   FWIW, it seems like the reason that tests still work is that they use the 
`org.easymock.Mockorg.easymock.Mock` annotation, instead of 
`org.powermock.api.easymock.annotation.Mock`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kobihikri commented on pull request #12300: Update zookeeper version to 3.8.0

2022-06-20 Thread GitBox


kobihikri commented on PR #12300:
URL: https://github.com/apache/kafka/pull/12300#issuecomment-1160544459

   Ok, I got you now. I will check and update.
   
   On Mon, Jun 20, 2022 at 5:44 PM Ismael Juma ***@***.***>
   wrote:
   
   > That doesn't answer the question since reload4j is fully compatible with
   > log4j. But it doesn't have the vulnerabilities.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   
   
   -- 
   May peace and love be your share.
   Kobi Hikri.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13998) JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13998.
-
Resolution: Fixed

> JoinGroupRequestData 'reason' can be too large
> --
>
> Key: KAFKA-13998
> URL: https://issues.apache.org/jira/browse/KAFKA-13998
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.2.0
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Blocker
> Fix For: 3.3.0, 3.2.1
>
>
> We saw an exception like this: 
> ```org.apache.kafka.streams.errors.StreamsException: 
> java.lang.RuntimeException: 'reason' field is too long to be serialized 3 at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
>  4 at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>  5Caused by: java.lang.RuntimeException: 'reason' field is too long to be 
> serialized 6 at 
> org.apache.kafka.common.message.JoinGroupRequestData.addSize(JoinGroupRequestData.java:465)
>  7 at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) 
> 8 at 
> org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
>  9 at 
> org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
>  10 at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:524) 
> 11 at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:500) 
> 12 at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:460) 13 
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:499)
>  14 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
>  15 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  16 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  17 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:437)
>  18 at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:371)
>  19 at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:542)
>  20 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
>  21 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) 
> 22 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1215) 
> 23 at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:969)
>  24 at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:917)
>  25 at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:736)
>  26 at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  27 ... 1 more```
> This appears to be caused by the code passing an entire stack trace in the 
> `rejoinReason`.  See 
> https://github.com/apache/kafka/blob/3.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L481



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] dajac commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


dajac commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160542531

   Merged to trunk and 3.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


dajac merged PR #12298:
URL: https://github.com/apache/kafka/pull/12298


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12300: Update zookeeper version to 3.8.0

2022-06-20 Thread GitBox


ijuma commented on PR #12300:
URL: https://github.com/apache/kafka/pull/12300#issuecomment-1160537036

   That doesn't answer the question since reload4j is fully compatible with 
log4j. But it doesn't have the vulnerabilities.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kobihikri commented on pull request #12300: Update zookeeper version to 3.8.0

2022-06-20 Thread GitBox


kobihikri commented on PR #12300:
URL: https://github.com/apache/kafka/pull/12300#issuecomment-1160527310

   You can even see in the failing tests of this PR, that there calls to log4j
   ...
   
   On Mon, Jun 20, 2022 at 5:35 PM Ismael Juma ***@***.***>
   wrote:
   
   > Can you clarify? The Kafka release should only include reload4j without
   > this PR. Are you seeing log4j in the release tarball?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   
   
   -- 
   May peace and love be your share.
   Kobi Hikri.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Kvicii commented on pull request #12277: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap

2022-06-20 Thread GitBox


Kvicii commented on PR #12277:
URL: https://github.com/apache/kafka/pull/12277#issuecomment-1160526400

   @hachikuji can you help me review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12300: Update zookeeper version to 3.8.0

2022-06-20 Thread GitBox


ijuma commented on PR #12300:
URL: https://github.com/apache/kafka/pull/12300#issuecomment-1160524088

   Can you clarify? The Kafka release should only include reload4j without this 
PR. Are you seeing log4j in the release tarball?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bozhao12 commented on a diff in pull request #12286: KAFKA-13984: Fix TopicBasedRemoteLogMetadataManager#initializeResources should exit immediately when partition size of __remote_lo

2022-06-20 Thread GitBox


bozhao12 commented on code in PR #12286:
URL: https://github.com/apache/kafka/pull/12286#discussion_r901721861


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java:
##
@@ -68,10 +72,27 @@ protected Map 
overrideRemoteLogMetadataManagerProps() {
 remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), 
true);
 }
 
-private void startTopicBasedRemoteLogMetadataManagerHarness(boolean 
startConsumerThread) {
-
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 startConsumerThread);
+private void startTopicBasedRemoteLogMetadataManagerHarness(boolean 
startConsumerThread,
+int 
remoteLogMetadataTopicPartitionCount) {
+
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 startConsumerThread,
+remoteLogMetadataTopicPartitionCount);
 }
 
+
+@Test
+public void testRLMMInitializeResources() throws Exception {
+assertTrue(topicBasedRlmm().isInitialized());
+
+stopTopicBasedRemoteLogMetadataManagerHarness();
+assertThrows(KafkaException.class, () -> 
startTopicBasedRemoteLogMetadataManagerHarness(false, 4));

Review Comment:
   @divijvaidya Thanks for your suggestion, I updated the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12286: KAFKA-13984: Fix TopicBasedRemoteLogMetadataManager#initializeResources should exit immediately when partition size of __remote

2022-06-20 Thread GitBox


divijvaidya commented on code in PR #12286:
URL: https://github.com/apache/kafka/pull/12286#discussion_r901679350


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java:
##
@@ -68,10 +72,27 @@ protected Map 
overrideRemoteLogMetadataManagerProps() {
 remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), 
true);
 }
 
-private void startTopicBasedRemoteLogMetadataManagerHarness(boolean 
startConsumerThread) {
-
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 startConsumerThread);
+private void startTopicBasedRemoteLogMetadataManagerHarness(boolean 
startConsumerThread,
+int 
remoteLogMetadataTopicPartitionCount) {
+
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(),
 startConsumerThread,
+remoteLogMetadataTopicPartitionCount);
 }
 
+
+@Test
+public void testRLMMInitializeResources() throws Exception {
+assertTrue(topicBasedRlmm().isInitialized());
+
+stopTopicBasedRemoteLogMetadataManagerHarness();
+assertThrows(KafkaException.class, () -> 
startTopicBasedRemoteLogMetadataManagerHarness(false, 4));

Review Comment:
   please replace hard coded `4` with `METADATA_TOPIC_PARTITIONS_COUNT + 1`. 
This would make it explicit state that we are starting RLMM with a partition 
value different from existing one here. Even better would be if we can get the 
number of partitions of existing topic and use `that + 1` here instead of 
METADATA_TOPIC_PARTITIONS_COUNT. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #12315: MINOR: the Kafka metadata shell should support connecting to a controller quorum

2022-06-20 Thread GitBox


cmccabe opened a new pull request, #12315:
URL: https://github.com/apache/kafka/pull/12315

   This PR allows us to support connecting the metadata shell to a live KRaft 
quorum via --controller,
   as specified in the original KIPs. In order to do this, the PR factors out a 
"MetadataShellSource"
   interface which is implemented by both a snapshot file and a live quorum.
   
   Previously, the "shell" gradle submodule dependend on "core." This extended 
the gradle build time
   because "core" takes a long time to build.  It also tended to reduce the 
encapsulation of the
   shell. This PR makes "core" depend on "shell" instead. In order to support 
this, the new entry
   point for the tool is kafka.tools.MetadataShellTool rather than
   org.apache.kafka.shell.MetadataShell.
   
   RaftManager.scala: make close() idempotent, as it is for nearly everything 
else. This makes cleanup
   logic easier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause a no-op call

2022-06-20 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556392#comment-17556392
 ] 

Luke Chen commented on KAFKA-14012:
---

You're right, it will be NPE after passing in the "method" as parameter. Thanks 
for correction. I've updated the JIRA.

> passing a "method" into the `Utils.closeQuietly` method cause a no-op call
> --
>
> Key: KAFKA-14012
> URL: https://issues.apache.org/jira/browse/KAFKA-14012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Priority: Major
>
> Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But 
> there are some places we passed "method" into Utils.closeQuietly, which 
> causes the object doesn't get closed as expected. 
> I found it appeared in:
> - WorkerConnector
> - AbstractWorkerSourceTask
> - KafkaConfigBackingStore
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause NPE

2022-06-20 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14012:
--
Summary: passing a "method" into the `Utils.closeQuietly` method cause NPE  
(was: passing a "method" into the `Utils.closeQuietly` method cause a no-op 
call)

> passing a "method" into the `Utils.closeQuietly` method cause NPE
> -
>
> Key: KAFKA-14012
> URL: https://issues.apache.org/jira/browse/KAFKA-14012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Priority: Major
>
> Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But 
> there are some places we passed "method" into Utils.closeQuietly, which 
> causes the object doesn't get closed as expected. 
> I found it appeared in:
> - WorkerConnector
> - AbstractWorkerSourceTask
> - KafkaConfigBackingStore
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] C0urante commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)

2022-06-20 Thread GitBox


C0urante commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r901682079


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##
@@ -63,28 +70,109 @@
 public class KafkaOffsetBackingStore implements OffsetBackingStore {
 private static final Logger log = 
LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
 
-private KafkaBasedLog offsetLog;
-private HashMap data;
+/**
+ * Build a connector-specific offset store with read and write support. 
The producer will be {@link Producer#close(Duration) closed}
+ * and the consumer will be {@link Consumer#close(Duration) closed} when 
this store is {@link #stop() stopped}, but the topic admin
+ * must be {@link TopicAdmin#close(Duration) closed} by the caller.
+ * @param topic the name of the offsets topic to use
+ * @param producer the producer to use for writing to the offsets topic
+ * @param consumer the consumer to use for reading from the offsets topic
+ * @param topicAdmin the topic admin to use for creating and querying 
metadata for the offsets topic
+ * @return an offset store backed by the given topic and Kafka clients
+ */
+public static KafkaOffsetBackingStore forTask(
+String topic,
+Producer producer,
+Consumer consumer,
+TopicAdmin topicAdmin
+) {
+return new KafkaOffsetBackingStore(() -> topicAdmin) {
+@Override
+public void configure(final WorkerConfig config) {
+exactlyOnce = config.exactlyOnceSourceEnabled();
+offsetLog = KafkaBasedLog.withExistingClients(
+topic,
+consumer,
+producer,
+topicAdmin,
+consumedCallback,
+Time.SYSTEM,
+initialize(topic, newTopicDescription(topic, config))
+);
+}
+};
+}
+
+/**
+ * Build a connector-specific offset store with read-only support. The 
consumer will be {@link Consumer#close(Duration) closed}
+ * when this store is {@link #stop() stopped}, but the topic admin must be 
{@link TopicAdmin#close(Duration) closed} by the caller.
+ * @param topic the name of the offsets topic to use
+ * @param consumer the consumer to use for reading from the offsets topic
+ * @param topicAdmin the topic admin to use for creating and querying 
metadata for the offsets topic
+ * @return a read-only offset store backed by the given topic and Kafka 
clients
+ */
+public static KafkaOffsetBackingStore forConnector(
+String topic,
+Consumer consumer,
+TopicAdmin topicAdmin
+) {
+return new KafkaOffsetBackingStore(() -> topicAdmin) {
+@Override
+public void configure(final WorkerConfig config) {
+exactlyOnce = config.exactlyOnceSourceEnabled();
+offsetLog = KafkaBasedLog.withExistingClients(
+topic,
+consumer,
+null,
+topicAdmin,
+consumedCallback,
+Time.SYSTEM,
+initialize(topic, newTopicDescription(topic, config))
+);
+}
+};
+}
+
+protected KafkaBasedLog offsetLog;
+private final HashMap data = new HashMap<>();
 private final Supplier topicAdminSupplier;
 private SharedTopicAdmin ownTopicAdmin;
+protected boolean exactlyOnce;
 
+/**
+ * Create an {@link OffsetBackingStore} backed by a Kafka topic. This 
constructor will cause the
+ * store to instantiate and close its own {@link TopicAdmin} during {@link 
#configure(WorkerConfig)}
+ * and {@link #stop()}, respectively.
+ *
+ * @deprecated use {@link #KafkaOffsetBackingStore(Supplier)} instead
+ */
 @Deprecated
 public KafkaOffsetBackingStore() {
 this.topicAdminSupplier = null;
 }
 
+/**
+ * Create an {@link OffsetBackingStore} backed by a Kafka topic. This 
constructor will use the given
+ * {@link Supplier} to acquire a {@link TopicAdmin} that will be used for 
interactions with the backing
+ * Kafka topic. The caller is expected to manage the lifecycle of that 
object, including
+ * {@link TopicAdmin#close(Duration) closing} it when it is no longer 
needed.
+ * @param topicAdmin a {@link Supplier} for the {@link TopicAdmin} to use 
for this backing store;
+ *   may not be null, and may not return null
+ */
 public KafkaOffsetBackingStore(Supplier topicAdmin) {
 this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
 }
 
+
 @Override
 public void configure(final WorkerConfig config) {
 String topic = 
config.get

[GitHub] [kafka] C0urante commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)

2022-06-20 Thread GitBox


C0urante commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r901681609


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1425,6 +1428,268 @@ public WorkerTask doBuild(Task task,
 }
 }
 
+// Visible for testing
+ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector(
+SourceConnectorConfig sourceConfig,
+String connName,
+Connector connector
+) {
+String connectorSpecificOffsetsTopic = sourceConfig.offsetsTopic();
+
+Map producerProps = baseProducerConfigs(connName, 
"connector-producer-" + connName, config, sourceConfig, connector.getClass(),
+connectorClientConfigOverridePolicy, kafkaClusterId);
+
+// We use a connector-specific store (i.e., a dedicated 
KafkaOffsetBackingStore for this connector)
+// if the worker supports per-connector offsets topics (which may be 
the case in distributed but not standalone mode, for example)
+// and if the connector is explicitly configured with an offsets topic
+final boolean usesConnectorSpecificStore = 
connectorSpecificOffsetsTopic != null
+&& config.connectorOffsetsTopicsPermitted();
+
+if (usesConnectorSpecificStore) {
+Map consumerProps = 
regularSourceOffsetsConsumerConfigs(
+connName, "connector-consumer-" + connName, config, 
sourceConfig, connector.getClass(),
+connectorClientConfigOverridePolicy, kafkaClusterId);
+KafkaConsumer consumer = new 
KafkaConsumer<>(consumerProps);
+
+Map adminOverrides = adminConfigs(connName, 
"connector-adminclient-" + connName, config,
+sourceConfig, connector.getClass(), 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+
+TopicAdmin admin = new TopicAdmin(adminOverrides);
+KafkaOffsetBackingStore connectorStore =
+
KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer, 
admin);
+
+// If the connector's offsets topic is the same as the 
worker-global offsets topic, there's no need to construct
+// an offset store that has a primary and a secondary store which 
both read from that same topic.
+// So, if the user has explicitly configured the connector with a 
connector-specific offsets topic
+// but we know that that topic is the same as the worker-global 
offsets topic, we ignore the worker-global
+// offset store and build a store backed exclusively by a 
connector-specific offsets store.
+// It may seem reasonable to instead build a store backed 
exclusively by the worker-global offset store, but that
+// would prevent users from being able to customize the config 
properties used for the Kafka clients that
+// access the offsets topic, and we would not be able to establish 
reasonable defaults like setting
+// isolation.level=read_committed for the offsets topic consumer 
for this connector
+if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic, 
producerProps)) {
+return ConnectorOffsetBackingStore.withOnlyConnectorStore(
+() -> LoggingContext.forConnector(connName),
+connectorStore,
+connectorSpecificOffsetsTopic,
+admin
+);
+} else {
+return 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+() -> LoggingContext.forConnector(connName),
+globalOffsetBackingStore,
+connectorStore,
+connectorSpecificOffsetsTopic,
+admin
+);
+}
+} else {
+return ConnectorOffsetBackingStore.withOnlyWorkerStore(
+() -> LoggingContext.forConnector(connName),
+globalOffsetBackingStore,
+config.offsetsTopic()
+);
+}
+}
+
+// Visible for testing
+ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector(
+SourceConnectorConfig sourceConfig,
+String connName,
+Connector connector
+) {
+String connectorSpecificOffsetsTopic = 
Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic());
+
+Map producerProps = baseProducerConfigs(connName, 
"connector-producer-" + connName, config, sourceConfig, connector.getClass(),
+connectorClientConfigOverridePolicy, kafkaClusterId);
+
+Map consumerProps = 
exactlyOnceSourceOffsetsConsumerConfigs(
+connName, "connector-consumer-" + connName, config, 
sourceConfig, connector.getClass(),
+ 

[jira] [Commented] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause a no-op call

2022-06-20 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556384#comment-17556384
 ] 

Chris Egerton commented on KAFKA-14012:
---

[~showuon] when does this no-op occur? The bugs that I've found related to 
{{Utils::closeQuietly}} have been NPEs caused by method references of null 
objects; I believe as long as the object is non-null and the correct method is 
referenced, things should go smoothly.

> passing a "method" into the `Utils.closeQuietly` method cause a no-op call
> --
>
> Key: KAFKA-14012
> URL: https://issues.apache.org/jira/browse/KAFKA-14012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Priority: Major
>
> Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But 
> there are some places we passed "method" into Utils.closeQuietly, which 
> causes the object doesn't get closed as expected. 
> I found it appeared in:
> - WorkerConnector
> - AbstractWorkerSourceTask
> - KafkaConfigBackingStore
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12305: MINOR: Add __cluster_metadata topic to list of internal topics

2022-06-20 Thread GitBox


divijvaidya commented on code in PR #12305:
URL: https://github.com/apache/kafka/pull/12305#discussion_r90178


##
clients/src/main/java/org/apache/kafka/common/internals/Topic.java:
##
@@ -33,7 +33,7 @@ public class Topic {
 public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]";
 
 private static final Set INTERNAL_TOPICS = 
Collections.unmodifiableSet(
-Utils.mkSet(GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME));
+Utils.mkSet(GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME, METADATA_TOPIC_NAME));

Review Comment:
   Please correct me if I am wrong here Andy but this is true only for kraft 
mode. I haven't tried it but in the zookeeper mode, it should be possible to 
create a topic with this name as a non-internal topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12314: KAFKA-13903: Queue size metric in QuorumController

2022-06-20 Thread GitBox


divijvaidya commented on code in PR #12314:
URL: https://github.com/apache/kafka/pull/12314#discussion_r901646698


##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -166,6 +171,13 @@ private class EventHandler implements Runnable {
  * A condition variable for waking up the event handler thread.
  */
 private final Condition cond = lock.newCondition();
+private final Consumer onQueueSizeChange;
+
+private AtomicInteger size = new AtomicInteger(0);

Review Comment:
   Long? Is there a reason we are limiting this to an integer?



##
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##
@@ -178,15 +190,18 @@ public void run() {
 }
 
 private void remove(EventContext eventContext) {
-eventContext.remove();
+boolean removed = eventContext.remove();
 if (eventContext.deadlineNs.isPresent()) {
-deadlineMap.remove(eventContext.deadlineNs.getAsLong());
+removed |= 
deadlineMap.remove(eventContext.deadlineNs.getAsLong()) != null;
 eventContext.deadlineNs = OptionalLong.empty();
 }
 if (eventContext.tag != null) {
 tagToEventContext.remove(eventContext.tag, eventContext);
 eventContext.tag = null;
 }
+if (removed) {
+onQueueSizeChange.accept(size.decrementAndGet());

Review Comment:
   I am a bit sceptical about running an arbitrary consumer in a synchronized 
block of code (this runs by acquiring a lock). Because the caller can 
potentially perform expensive things in callback which could cause block the 
other threads from accessing the lock, hence, block critical execution while 
performing a side-effect. Instead, could we make it an async explicit metric 
publish event?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12279: KAFKA-10199: Commit the restoration progress within StateUpdater

2022-06-20 Thread GitBox


cadonna commented on code in PR #12279:
URL: https://github.com/apache/kafka/pull/12279#discussion_r901539086


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws 
Exception {
 verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
 }
 
+@Test
+public void shouldAutoCommitTasksOnInterval() throws Exception {
+final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+when(changelogReader.completedChangelogs())
+.thenReturn(Collections.emptySet());
+when(changelogReader.allChangelogsCompleted())
+.thenReturn(false);
+
+stateUpdater.add(task1);
+stateUpdater.add(task2);
+stateUpdater.add(task3);
+stateUpdater.add(task4);
+
+sleep(COMMIT_INTERVAL);
+
+verifyExceptionsAndFailedTasks();
+verifyCheckpointTasks(false, task1, task2, task3, task4);
+}
+
+@Test
+public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws 
Exception {
+final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
+final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);

Review Comment:
   It is better to do this, otherwise verifications like 
`verifyExceptionsAndFailedTasks()` will not work. You do not use them in this 
method, but the change makes the test future-proof.
   ```suggestion
   stateUpdater.shutdown(Duration.ofMinutes(1));
   final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
   stateUpdater = new DefaultStateUpdater(config, changelogReader, 
offsetResetter, Time.SYSTEM);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12279: KAFKA-10199: Commit the restoration progress within StateUpdater

2022-06-20 Thread GitBox


cadonna commented on code in PR #12279:
URL: https://github.com/apache/kafka/pull/12279#discussion_r901395029


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -85,7 +86,7 @@ public Collection getUpdatingStandbyTasks() {
 }
 
 public boolean onlyStandbyTasksLeft() {
-return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().allMatch(t -> !t.isActive());
+return !updatingTasks.isEmpty() && 
updatingTasks.values().stream().noneMatch(Task::isActive);

Review Comment:
   I did the same change in https://github.com/apache/kafka/pull/12312 🙂 
   I will revert the change in my PR to avoid merge conflicts.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -603,6 +637,67 @@ public void shouldDrainFailedTasksAndExceptions() throws 
Exception {
 verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, 
expectedExceptionAndTasks3, expectedExceptionAndTasks4);
 }
 
+@Test
+public void shouldAutoCommitTasksOnInterval() throws Exception {
+final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+when(changelogReader.completedChangelogs())
+.thenReturn(Collections.emptySet());
+when(changelogReader.allChangelogsCompleted())
+.thenReturn(false);
+
+stateUpdater.add(task1);
+stateUpdater.add(task2);
+stateUpdater.add(task3);
+stateUpdater.add(task4);
+
+sleep(COMMIT_INTERVAL);
+
+verifyExceptionsAndFailedTasks();
+verifyCheckpointTasks(false, task1, task2, task3, task4);
+}
+
+@Test
+public void shouldNotAutoCommitTasksIfIntervalNotElapsed() throws 
Exception {
+final StreamsConfig config = new 
StreamsConfig(configProps(Integer.MAX_VALUE));
+final DefaultStateUpdater stateUpdater = new 
DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
+
+try {
+final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+final StreamTask task2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_2, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+final StandbyTask task3 = 
createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+final StandbyTask task4 = 
createStandbyTaskInStateRunning(TASK_1_1, 
Collections.singletonList(TOPIC_PARTITION_D_0));
+when(changelogReader.completedChangelogs())
+.thenReturn(Collections.emptySet());
+when(changelogReader.allChangelogsCompleted())
+.thenReturn(false);
+
+stateUpdater.add(task1);
+stateUpdater.add(task2);
+stateUpdater.add(task3);
+stateUpdater.add(task4);
+
+verifyNeverCheckpointTasks(task1, task2, task3, task4);
+} finally {
+stateUpdater.shutdown(Duration.ofMinutes(1));
+}
+}
+
+private void verifyCheckpointTasks(final boolean enforceCheckpoint, final 
Task... tasks) throws Exception {

Review Comment:
   ```suggestion
   private void verifyCheckpointTasks(final boolean enforceCheckpoint, 
final Task... tasks) {
   ```



##
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java:
##
@@ -236,6 +236,16 @@ public static String safeUniqueTestName(final Class 
testClass, final TestName
 .replace('=', '_');
 }
 
+public static String safeUniqueClassTestName(final Class testClass) {
+return (testClass.getSimpleName())
+.replace(':', '_')
+.replace('.', '_')
+.replace('[', '_')
+.replace(']', '_')
+.replace(' ', '_')
+.replace('=', '_');
+}
+

Review Comment:
   This should be dead code now that we do not need this method in 
`DefaultStateUpdater`. Could you please remove it?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -290,6 +295,23 @@ private void addTaskToRestoredTasks(final StreamTask task) 
{
 restoredActiveTasksLock.unlock();
 }
 }
+
+private void maybeCommitRestoringTasks(final long now) {

Review Comment:
   Could you please rename to `maybe

[GitHub] [kafka] cadonna commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


cadonna commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160402784

   I am +1 for introducing a limit of 255 characters for the reason in a 
follow-up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


cadonna commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160401417

   @jnh5y I do not completely understand what you mean by:
   
   > I tried to find a way to create a unit test for this change, but I wasn't 
able to do so quickly.
   
   I have the impression that the unit test coverage is sufficient in 
`AbstractCoordinator#testRejoinReason`.
   
   So I am fine with merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9837) New RPC for notifying controller of failed replica

2022-06-20 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556347#comment-17556347
 ] 

Igor Soarez commented on KAFKA-9837:


An update, I've been working on a proposal to maintain log directory 
information on the controller. There's quite a lot that needs to change to 
support this while keeping compatibility with the current behavior around JBOD 
in ZK mode.  There are still some loose ends but I'm hoping to have a KIP soon. 
If anyone is interested it would be super useful to have a chat about this.

[~dengziming] , thanks for your name change suggestion, I'm taking it.

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: New Feature
>  Components: controller, core
>Reporter: David Arthur
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] soarez opened a new pull request, #12314: KAFKA-13903: Queue size metric in QuorumController

2022-06-20 Thread GitBox


soarez opened a new pull request, #12314:
URL: https://github.com/apache/kafka/pull/12314

   In order to stay in-line with existing queue metrics in 
ControllerEventManager
   mbean, we need to include a queue size metric in KRaft mode.
   
   The current size of the underlying queue in QuorumController
   needs to be exposed as:
   
   ```
   kafka.controller:type=ControllerEventManager,name=EventQueueSize
   ```
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
   
   [➡ Link to JIRA](https://issues.apache.org/jira/browse/KAFKA-13903)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13903) Add queue size metric to QuorumController

2022-06-20 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-13903:
---

Assignee: Igor Soarez

> Add queue size metric to QuorumController
> -
>
> Key: KAFKA-13903
> URL: https://issues.apache.org/jira/browse/KAFKA-13903
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: David Arthur
>Assignee: Igor Soarez
>Priority: Major
>  Labels: kip-500
>
> In order to stay in-line with existing queue metrics in 
> ControllerEventManager mbean, we need to include a queue size. The current 
> size of the underlying queue in QuorumController needs to be exposed as: 
> {{kafka.controller:type=ControllerEventManager,name=EventQueueSize}}
> It looks like KafkaEventQueue will need to add support for reporting its size.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-14011) Reduce retention.ms from default 7 days to 1 day (Make it configurable) for the dead letter topic created for error handling via sink connector

2022-06-20 Thread Jordan Moore (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556337#comment-17556337
 ] 

Jordan Moore edited comment on KAFKA-14011 at 6/20/22 11:32 AM:


"topic.creation" configs are only available for source connectors, for topics 
that source connectors will write to, not dead letter topics. "admin" prefix 
will only attempt to modify the AdminClient config, which does not include any 
topic configs, such as retention time. 

If you are allowing the broker to auto-create topics with defaults, then it's 
recommended that you disable this, and create all necessary topics ahead of 
time, with the topic settings you need. 

For already existing topics, you can alter their configs. 

Both operations can be done with kafka-topics script 




was (Author: cricket007):
"topic.creation" configs are only available for source connectors, for topics 
that source connectors will write to, not dead letter topics.

If you are allowing the broker to auto-create topics with defaults, then it's 
recommended that you disable this, and create all necessary topics ahead of 
time, with the topic settings you need. 

For already existing topics, you can alter their configs. 

Both operations can be done with kafka-topics script 



> Reduce retention.ms from default 7 days to 1 day (Make it configurable) for 
> the dead letter topic created for error handling via sink connector
> ---
>
> Key: KAFKA-14011
> URL: https://issues.apache.org/jira/browse/KAFKA-14011
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Souvik
>Priority: Major
>
> We are creating a sink connector along with the error handling mechanism , so 
> if in case if there is bad record it is routed to error queue , The 
> properties used while creating the sink connector are as following
>  
>  'errors.tolerance'='all',
>  'errors.deadletterqueue.topic.name' = 'error_dlq',
>  'errors.deadletterqueue.topic.replication.factor'= -1,
>  'errors.log.include.messages' = true,
>  'errors.log.enable' = true,
>  'errors.deadletterqueue.context.headers.enable' = true
> *now is there any way we can configure the retention.ms for the dead letter 
> queue topic i.e from default 7 days to 1 day , or is there any way we can 
> change the compaction policy by providing any property while creating the 
> source connector*
> *we have tried the following properties in the connector*
> {code:java}
> 'topic.creation.default.retention.ms'='3000',
> 'admin.topic.creation.default.retention.ms'='3000',
> 'admin.retention.ms' = '3000',
> 'admin.topic.retention.ms' = '3000',
> 'admin.topic.creation.retention.ms' = '3000',
> 'error.topic.creation.default.retention.ms'='3000',
> 'error.deadletterqueue.topic.creation.default.retention.ms'='3000',
> 'error.deadletterqueue.topic.retention.ms'='3000',
> 'admin.topic.creation.default.retention.ms' = 3000, {code}
> but it did not work.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-14011) Reduce retention.ms from default 7 days to 1 day (Make it configurable) for the dead letter topic created for error handling via sink connector

2022-06-20 Thread Jordan Moore (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556337#comment-17556337
 ] 

Jordan Moore commented on KAFKA-14011:
--

"topic.creation" configs are only available for source connectors, for topics 
that source connectors will write to, not dead letter topics.

If you are allowing the broker to auto-create topics with defaults, then it's 
recommended that you disable this, and create all necessary topics ahead of 
time, with the topic settings you need. 

For already existing topics, you can alter their configs. 

Both operations can be done with kafka-topics script 



> Reduce retention.ms from default 7 days to 1 day (Make it configurable) for 
> the dead letter topic created for error handling via sink connector
> ---
>
> Key: KAFKA-14011
> URL: https://issues.apache.org/jira/browse/KAFKA-14011
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Souvik
>Priority: Major
>
> We are creating a sink connector along with the error handling mechanism , so 
> if in case if there is bad record it is routed to error queue , The 
> properties used while creating the sink connector are as following
>  
>  'errors.tolerance'='all',
>  'errors.deadletterqueue.topic.name' = 'error_dlq',
>  'errors.deadletterqueue.topic.replication.factor'= -1,
>  'errors.log.include.messages' = true,
>  'errors.log.enable' = true,
>  'errors.deadletterqueue.context.headers.enable' = true
> *now is there any way we can configure the retention.ms for the dead letter 
> queue topic i.e from default 7 days to 1 day , or is there any way we can 
> change the compaction policy by providing any property while creating the 
> source connector*
> *we have tried the following properties in the connector*
> {code:java}
> 'topic.creation.default.retention.ms'='3000',
> 'admin.topic.creation.default.retention.ms'='3000',
> 'admin.retention.ms' = '3000',
> 'admin.topic.retention.ms' = '3000',
> 'admin.topic.creation.retention.ms' = '3000',
> 'error.topic.creation.default.retention.ms'='3000',
> 'error.deadletterqueue.topic.creation.default.retention.ms'='3000',
> 'error.deadletterqueue.topic.retention.ms'='3000',
> 'admin.topic.creation.default.retention.ms' = 3000, {code}
> but it did not work.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] soarez commented on pull request #12198: MINOR: Avoid possibly resolvable name in tests

2022-06-20 Thread GitBox


soarez commented on PR #12198:
URL: https://github.com/apache/kafka/pull/12198#issuecomment-1160296587

   @hachikuji could you review this quick change? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)

2022-06-20 Thread GitBox


showuon commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r901473434


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##
@@ -63,28 +70,109 @@
 public class KafkaOffsetBackingStore implements OffsetBackingStore {
 private static final Logger log = 
LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
 
-private KafkaBasedLog offsetLog;
-private HashMap data;
+/**
+ * Build a connector-specific offset store with read and write support. 
The producer will be {@link Producer#close(Duration) closed}
+ * and the consumer will be {@link Consumer#close(Duration) closed} when 
this store is {@link #stop() stopped}, but the topic admin
+ * must be {@link TopicAdmin#close(Duration) closed} by the caller.
+ * @param topic the name of the offsets topic to use
+ * @param producer the producer to use for writing to the offsets topic
+ * @param consumer the consumer to use for reading from the offsets topic
+ * @param topicAdmin the topic admin to use for creating and querying 
metadata for the offsets topic
+ * @return an offset store backed by the given topic and Kafka clients
+ */
+public static KafkaOffsetBackingStore forTask(
+String topic,
+Producer producer,
+Consumer consumer,
+TopicAdmin topicAdmin
+) {
+return new KafkaOffsetBackingStore(() -> topicAdmin) {
+@Override
+public void configure(final WorkerConfig config) {
+exactlyOnce = config.exactlyOnceSourceEnabled();
+offsetLog = KafkaBasedLog.withExistingClients(
+topic,
+consumer,
+producer,
+topicAdmin,
+consumedCallback,
+Time.SYSTEM,
+initialize(topic, newTopicDescription(topic, config))
+);
+}
+};
+}
+
+/**
+ * Build a connector-specific offset store with read-only support. The 
consumer will be {@link Consumer#close(Duration) closed}
+ * when this store is {@link #stop() stopped}, but the topic admin must be 
{@link TopicAdmin#close(Duration) closed} by the caller.
+ * @param topic the name of the offsets topic to use
+ * @param consumer the consumer to use for reading from the offsets topic
+ * @param topicAdmin the topic admin to use for creating and querying 
metadata for the offsets topic
+ * @return a read-only offset store backed by the given topic and Kafka 
clients
+ */
+public static KafkaOffsetBackingStore forConnector(
+String topic,
+Consumer consumer,
+TopicAdmin topicAdmin
+) {
+return new KafkaOffsetBackingStore(() -> topicAdmin) {
+@Override
+public void configure(final WorkerConfig config) {
+exactlyOnce = config.exactlyOnceSourceEnabled();
+offsetLog = KafkaBasedLog.withExistingClients(
+topic,
+consumer,
+null,
+topicAdmin,
+consumedCallback,
+Time.SYSTEM,
+initialize(topic, newTopicDescription(topic, config))
+);
+}
+};
+}
+
+protected KafkaBasedLog offsetLog;
+private final HashMap data = new HashMap<>();
 private final Supplier topicAdminSupplier;
 private SharedTopicAdmin ownTopicAdmin;
+protected boolean exactlyOnce;
 
+/**
+ * Create an {@link OffsetBackingStore} backed by a Kafka topic. This 
constructor will cause the
+ * store to instantiate and close its own {@link TopicAdmin} during {@link 
#configure(WorkerConfig)}
+ * and {@link #stop()}, respectively.
+ *
+ * @deprecated use {@link #KafkaOffsetBackingStore(Supplier)} instead
+ */
 @Deprecated
 public KafkaOffsetBackingStore() {
 this.topicAdminSupplier = null;
 }
 
+/**
+ * Create an {@link OffsetBackingStore} backed by a Kafka topic. This 
constructor will use the given
+ * {@link Supplier} to acquire a {@link TopicAdmin} that will be used for 
interactions with the backing
+ * Kafka topic. The caller is expected to manage the lifecycle of that 
object, including
+ * {@link TopicAdmin#close(Duration) closing} it when it is no longer 
needed.
+ * @param topicAdmin a {@link Supplier} for the {@link TopicAdmin} to use 
for this backing store;
+ *   may not be null, and may not return null
+ */
 public KafkaOffsetBackingStore(Supplier topicAdmin) {
 this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
 }
 
+
 @Override
 public void configure(final WorkerConfig config) {
 String topic = 
config.getS

[jira] [Updated] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause a no-op call

2022-06-20 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14012:
--
Summary: passing a "method" into the `Utils.closeQuietly` method cause a 
no-op call  (was: resource leak due to passing a "method" into the 
`Utils.closeQuietly` method)

> passing a "method" into the `Utils.closeQuietly` method cause a no-op call
> --
>
> Key: KAFKA-14012
> URL: https://issues.apache.org/jira/browse/KAFKA-14012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Priority: Major
>
> Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But 
> there are some places we passed "method" into Utils.closeQuietly, which 
> causes the object doesn't get closed as expected. 
> I found it appeared in:
> - WorkerConnector
> - AbstractWorkerSourceTask
> - KafkaConfigBackingStore
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cadonna commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


cadonna commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160246521

   > We should definitely fix the failing tests. The PR is not ready to be 
merged at the moment. I will quickly fix it so we can merge it.
   
   Sounds good!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


dajac commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160230612

   > > I wonder if we should limit the reason when we build 
JoinGroupRequestData. We could pick the first 255 chars for instance. That 
would prevent us from hitting this in the future. I am not sure what a good 
limit would be.
   > 
   > @dajac is there a specific reason you propose 255? Has this to do with the 
size of the entire request? If yes, on what does the size of the request depend 
on besides the reason? Is there a way to calculate how long the reason can be?
   > 
   > I am not familiar with the error this the too long reason caused, so bear 
with me, if those are stupid question.
   
   No specific reason for the proposed 255. My reasoning is that the reason 
should be rather small. We have actually two upper bounds: 1) the `reason` 
field's length is limited to `0x7fff`, and 2) the require size could also be 
limited on the broker side. I am actually surprised that we hit 1) here.
   
   I think keeping the first 250 (or even 500) characters of the `reason` 
should be informative enough on the broker side and that would avoid being 
subject to hitting 1) in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


dajac commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160224594

   > I do not see the need for such hurry. The build failures seem to be 
related to this PR. So those need to be fixed anyways. Code freeze for 3.3.0 is 
on July 20th, so plenty of time to add the length limitation and the unit test. 
If we can really not wait that @jnh5y is back we should open a new PR with all 
the stuff in place as soon as possible.
   
   We should definitely fix the failing tests. The PR is not ready to be merged 
at the moment. I will quickly fix it so we can merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


cadonna commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160217093

   > I wonder if we should limit the reason when we build JoinGroupRequestData. 
We could pick the first 255 chars for instance. That would prevent us from 
hitting this in the future. I am not sure what a good limit would be.
   
   @dajac is there a specific reason you propose 255? Has this to do with the 
size of the entire request? If yes, on what does the size of the request depend 
on besides the reason?
   
   I am not familiar with the error this the too long reason caused, so bear 
with me, if those are stupid question.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12298: KAFKA-13998: JoinGroupRequestData 'reason' can be too large

2022-06-20 Thread GitBox


cadonna commented on PR #12298:
URL: https://github.com/apache/kafka/pull/12298#issuecomment-1160210603

   I do not see the need for such hurry. The build failures seem to be related 
to this PR. So those need to be fixed anyways. Code freeze for 3.3.0 is on July 
20th, so plenty of time to add the limitation and the unit test. 
   If we can really not wait that @jnh5y is back we should open a new PR with 
all the stuff in place as soon as possible.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14012) resource leak due to passing a "method" into the `Utils.closeQuietly` method

2022-06-20 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14012:
-

 Summary: resource leak due to passing a "method" into the 
`Utils.closeQuietly` method
 Key: KAFKA-14012
 URL: https://issues.apache.org/jira/browse/KAFKA-14012
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.2.0
Reporter: Luke Chen


Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But 
there are some places we passed "method" into Utils.closeQuietly, which causes 
the object doesn't get closed as expected. 

I found it appeared in:

- WorkerConnector
- AbstractWorkerSourceTask
- KafkaConfigBackingStore

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] kobihikri commented on pull request #12300: Update zookeeper version to 3.8.0

2022-06-20 Thread GitBox


kobihikri commented on PR #12300:
URL: https://github.com/apache/kafka/pull/12300#issuecomment-1160188536

   @ijuma @Kvicii 
   
   Can you kindly guide me with how to trigger the CI?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12312: KAFKA-10199: Expose tasks in state updater

2022-06-20 Thread GitBox


cadonna commented on code in PR #12312:
URL: https://github.com/apache/kafka/pull/12312#discussion_r901437357


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -652,7 +876,10 @@ private void verifyUpdatingTasks(final Task... tasks) 
throws Exception {
 "Did not get all updating task within the given timeout!"
 );
 assertEquals(expectedUpdatingTasks.size(), updatingTasks.size());
-assertTrue(updatingTasks.stream().allMatch(task -> task.state() == 
State.RESTORING));
+assertTrue(updatingTasks.stream()
+.allMatch(task -> task.isActive() && task.state() == 
State.RESTORING
+||
+!task.isActive() && task.state() == State.RUNNING));

Review Comment:
   Here I missed to verify the updating standby tasks before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12312: KAFKA-10199: Expose tasks in state updater

2022-06-20 Thread GitBox


cadonna commented on code in PR #12312:
URL: https://github.com/apache/kafka/pull/12312#discussion_r901435548


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -272,15 +302,19 @@ private void maybeCompleteRestoration(final StreamTask 
task,
 if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
 task.completeRestoration(offsetResetter);
 log.debug("Stateful active task " + task.id() + " completed 
restoration");
-addTaskToRestoredTasks(task);
+addToRestoredTasks(task);
 updatingTasks.remove(task.id());
-if (onlyStandbyTasksLeft()) {
-changelogReader.transitToUpdateStandby();
-}
+transitToUpdateStandbysIfOnlyStandbysLeft();
 }
 }
 
-private void addTaskToRestoredTasks(final StreamTask task) {
+private void transitToUpdateStandbysIfOnlyStandbysLeft() {
+if (onlyStandbyTasksLeft()) {
+changelogReader.transitToUpdateStandby();
+}
+}

Review Comment:
   We missed to switch to updating standby tasks when all active tasks are 
removed or they failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12312: KAFKA-10199: Expose tasks in state updater

2022-06-20 Thread GitBox


cadonna commented on code in PR #12312:
URL: https://github.com/apache/kafka/pull/12312#discussion_r901432897


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -327,6 +329,56 @@ public void 
shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreA
 orderVerifier.verify(changelogReader, 
times(1)).transitToUpdateStandby();
 }
 
+@Test
+public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksFailed() 
throws Exception {
+final StreamTask activeTask1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+final StreamTask activeTask2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+final StandbyTask standbyTask = 
createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+final TaskCorruptedException taskCorruptedException =
+new TaskCorruptedException(mkSet(activeTask1.id(), 
activeTask2.id()));
+final Map updatingTasks1 = mkMap(
+mkEntry(activeTask1.id(), activeTask1),
+mkEntry(activeTask2.id(), activeTask2),
+mkEntry(standbyTask.id(), standbyTask)
+);
+
doThrow(taskCorruptedException).doNothing().when(changelogReader).restore(updatingTasks1);
+when(changelogReader.allChangelogsCompleted())
+.thenReturn(false);
+
+stateUpdater.add(activeTask1);
+stateUpdater.add(activeTask2);
+stateUpdater.add(standbyTask);
+
+final ExceptionAndTasks expectedExceptionAndTasks =
+new ExceptionAndTasks(mkSet(activeTask1, activeTask2), 
taskCorruptedException);
+verifyExceptionsAndFailedTasks(expectedExceptionAndTasks);
+final InOrder orderVerifier = inOrder(changelogReader);
+orderVerifier.verify(changelogReader, 
atLeast(1)).enforceRestoreActive();
+orderVerifier.verify(changelogReader, 
times(1)).transitToUpdateStandby();
+}
+
+@Test
+public void shouldUpdateStandbyTaskAfterAllActiveStatefulTasksRemoved() 
throws Exception {
+final StreamTask activeTask1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+final StreamTask activeTask2 = 
createActiveStatefulTaskInStateRestoring(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+final StandbyTask standbyTask = 
createStandbyTaskInStateRunning(TASK_1_0, 
Collections.singletonList(TOPIC_PARTITION_C_0));
+
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
+when(changelogReader.allChangelogsCompleted())
+.thenReturn(false);
+stateUpdater.add(activeTask1);
+stateUpdater.add(activeTask2);
+stateUpdater.add(standbyTask);
+verifyUpdatingTasks(activeTask1, activeTask2, standbyTask);
+
+stateUpdater.remove(activeTask1.id());
+stateUpdater.remove(activeTask2.id());
+
+verifyRemovedTasks(activeTask1, activeTask2);
+final InOrder orderVerifier = inOrder(changelogReader);
+orderVerifier.verify(changelogReader, 
atLeast(1)).enforceRestoreActive();
+orderVerifier.verify(changelogReader, 
times(1)).transitToUpdateStandby();
+}
+

Review Comment:
   We missed to switch to updating standby tasks when all active tasks are 
removed or they failed. This are the corresponding unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11781: KAFKA-10000: Per-connector offsets topics (KIP-618)

2022-06-20 Thread GitBox


showuon commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r901426968


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##
@@ -381,6 +435,10 @@ Map 
readEndOffsets(Set assignment, boolean
 } catch (UnsupportedVersionException e) {
 // This may happen with really old brokers that don't support 
the auto topic creation
 // field in metadata requests
+if (requireAdminForOffsets) {
+// Should be handled by the caller during log startup
+throw e;

Review Comment:
   SGTM. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tombentley commented on a diff in pull request #12224: KAFKA-13943: Make LocalLogManager implementation consistent with the RaftClient interface contract

2022-06-20 Thread GitBox


tombentley commented on code in PR #12224:
URL: https://github.com/apache/kafka/pull/12224#discussion_r901413660


##
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java:
##
@@ -226,16 +227,19 @@ synchronized long tryAppend(int nodeId, int epoch, 
List ba
 }
 
 synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) {
-if (epoch != leader.epoch()) {
-log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch 
does not " +
-"match the current leader epoch of {}.", nodeId, epoch, 
leader.epoch());
-return Long.MAX_VALUE;
-}
 if (!leader.isLeader(nodeId)) {
-log.trace("tryAppend(nodeId={}, epoch={}): the given node id 
does not " +
-"match the current leader id of {}.", nodeId, epoch, 
leader.leaderId());
-return Long.MAX_VALUE;
+log.debug("tryAppend(nodeId={}, epoch={}): the given node id 
does not " +
+"match the current leader id of {}.", nodeId, epoch, 
leader.leaderId());
+throw new NotLeaderException("Append failed because the 
replication is not the current leader");
+}
+
+if (epoch < leader.epoch()) {
+throw new NotLeaderException("Append failed because the epoch 
doesn't match");

Review Comment:
   'doesn't match' => `!=`, which isn't what the check's actually doing. 
Perhaps something like "... because the given epoch is stale" would be clearer.



##
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java:
##
@@ -723,9 +727,35 @@ public long scheduleAtomicAppend(int epoch, 
List batch) {
 
 @Override
 public void resign(int epoch) {
-LeaderAndEpoch curLeader = leader;
-LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), 
curLeader.epoch() + 1);
-shared.tryAppend(nodeId, curLeader.epoch(), new 
LeaderChangeBatch(nextLeader));
+if (epoch < 0) {
+throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+}
+
+LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+int currentEpoch = leaderAndEpoch.epoch();
+
+if (epoch > currentEpoch) {
+throw new IllegalArgumentException("Attempt to resign from epoch " 
+ epoch +
+" which is larger than the current epoch " + currentEpoch);

Review Comment:
   The contract in `RaftClient` still says 
   
   ```java
* @param epoch the epoch to resign from. If this does not match the 
current epoch, this
*  call will be ignored.
   ```
   
   Of course `KafkaRaftClient` is doing the same check, so I guess we need to 
update the `RaftClient` javadoc.



##
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java:
##
@@ -723,9 +727,35 @@ public long scheduleAtomicAppend(int epoch, 
List batch) {
 
 @Override
 public void resign(int epoch) {
-LeaderAndEpoch curLeader = leader;
-LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), 
curLeader.epoch() + 1);
-shared.tryAppend(nodeId, curLeader.epoch(), new 
LeaderChangeBatch(nextLeader));
+if (epoch < 0) {
+throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+}
+
+LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+int currentEpoch = leaderAndEpoch.epoch();
+
+if (epoch > currentEpoch) {
+throw new IllegalArgumentException("Attempt to resign from epoch " 
+ epoch +
+" which is larger than the current epoch " + currentEpoch);
+} else if (epoch < currentEpoch) {
+// If the passed epoch is smaller than the current epoch, then it 
might mean
+// that the listener has not been notified about a leader change 
that already
+// took place. In this case, we consider the call as already 
fulfilled and
+// take no further action.
+log.debug("Ignoring call to resign from epoch {} since it is 
smaller than the " +
+"current epoch {}", epoch, currentEpoch);
+return;
+}
+
+LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), 
currentEpoch + 1);
+try {
+shared.tryAppend(nodeId, currentEpoch, new 
LeaderChangeBatch(nextLeader));
+} catch (NotLeaderException exp) {
+// the leader epoch has already advanced. resign is a no op.
+log.debug("Ignoring call to resign from epoch {} since it is 
smaller than the " +
+"current epoch {}", epoch, currentEpoch);

Review Comment:
   `tryAppend` can throw `NotLeaderException` when `nodeId` is not the current 
leader too, so the log message could be confusing. 



-- 
This is an automated mes

[GitHub] [kafka] showuon merged pull request #12311: MINOR: Remove extra commas in upgrade steps documentation

2022-06-20 Thread GitBox


showuon merged PR #12311:
URL: https://github.com/apache/kafka/pull/12311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on pull request #12313: MINOR: Add the change statement of reload4j in Notable changes of 3.1.1

2022-06-20 Thread GitBox


RivenSun2 commented on PR #12313:
URL: https://github.com/apache/kafka/pull/12313#issuecomment-1160149270

   Hi @showuon 
   could you help to review this PR?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 opened a new pull request, #12313: MINOR: Add the change statement of reload4j in Notable changes of 3.1.1

2022-06-20 Thread GitBox


RivenSun2 opened a new pull request, #12313:
URL: https://github.com/apache/kafka/pull/12313

   Referring to KAFKA-13660, reload4j is a very important change that solves 
the security problem of log4j. It was declared in Notable changes in 3.2.0, but 
missing in Notable changes in 3.1.1
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12312: KAFKA-10199: Expose tasks in state updater

2022-06-20 Thread GitBox


cadonna opened a new pull request, #12312:
URL: https://github.com/apache/kafka/pull/12312

   This PR exposes the tasks managed by the state updater. The state
   updater manages all tasks there were added to the state updater
   and that have not yet been removed from it by draining one of the
   ouput queues.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kobihikri commented on pull request #12300: Update zookeeper version to 3.8.0

2022-06-20 Thread GitBox


kobihikri commented on PR #12300:
URL: https://github.com/apache/kafka/pull/12300#issuecomment-1160124213

   @Kvicii @ijuma 
   
   This PR is due to desirable changes made in zookeeper 3.8.0.
   
   In particular - the migration to logback, which together with the changes to 
kafka-clients 3.2.0 - rids us of the previous logging library completely (and 
the inherent security risk in it).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on pull request #12311: MINOR: Remove extra commas in upgrade steps documentation

2022-06-20 Thread GitBox


RivenSun2 commented on PR #12311:
URL: https://github.com/apache/kafka/pull/12311#issuecomment-1160120900

   Hi @showuon 
   could you help to review this PR?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 opened a new pull request, #12311: MINOR: Remove extra commas in upgrade steps documentation

2022-06-20 Thread GitBox


RivenSun2 opened a new pull request, #12311:
URL: https://github.com/apache/kafka/pull/12311

   Starting from version 2.5.0, in the upgrade steps document, there is an 
extra comma
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14011) Reduce retention.ms from default 7 days to 1 day (Make it configurable) for the dead letter topic created for error handling via sink connector

2022-06-20 Thread Souvik (Jira)
Souvik created KAFKA-14011:
--

 Summary: Reduce retention.ms from default 7 days to 1 day (Make it 
configurable) for the dead letter topic created for error handling via sink 
connector
 Key: KAFKA-14011
 URL: https://issues.apache.org/jira/browse/KAFKA-14011
 Project: Kafka
  Issue Type: Improvement
  Components: config, KafkaConnect
Affects Versions: 3.0.1
Reporter: Souvik


We are creating a sink connector along with the error handling mechanism , so 
if in case if there is bad record it is routed to error queue , The properties 
used while creating the sink connector are as following

 
 'errors.tolerance'='all',
 'errors.deadletterqueue.topic.name' = 'error_dlq',
 'errors.deadletterqueue.topic.replication.factor'= -1,
 'errors.log.include.messages' = true,
 'errors.log.enable' = true,
 'errors.deadletterqueue.context.headers.enable' = true
*now is there any way we can configure the retention.ms for the dead letter 
queue topic i.e from default 7 days to 1 day , or is there any way we can 
change the compaction policy by providing any property while creating the 
source connector*
*we have tried the following properties in the connector*
{code:java}
'topic.creation.default.retention.ms'='3000',
'admin.topic.creation.default.retention.ms'='3000',
'admin.retention.ms' = '3000',
'admin.topic.retention.ms' = '3000',
'admin.topic.creation.retention.ms' = '3000',
'error.topic.creation.default.retention.ms'='3000',
'error.deadletterqueue.topic.creation.default.retention.ms'='3000',
'error.deadletterqueue.topic.retention.ms'='3000',
'admin.topic.creation.default.retention.ms' = 3000, {code}
but it did not work.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-20 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/20/22 7:15 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and *only* a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of 

[jira] [Comment Edited] (KAFKA-8769) Consider computing stream time independently per key

2022-06-20 Thread ATZMON CHEN TOV (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556070#comment-17556070
 ] 

ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/20/22 7:15 AM:
-

Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end (and a small fraction, 10+ minutes from hour end.)

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 


was (Author: JIRAUSER291214):
Hi [~mjsax] , 

For us it would be very useful to have suppress per-key *in addition* to 
suppression by grace on stream time.  What I mean is that when a key arrives 
for a window, it will cause eviction of the aggregation for the previous window 
(of the same key).  In case no key arrives for the 'next' window for a certain 
key, the (existing) grace-on-stream-time will evict it. 

The business benefit is that this will allow to avoid delaying unnecessarily 
the entire traffic.  I believe that this will benefit the common case, where 
order per key *is* (at least effectively) guaranteed. 

In our use case we do hourly aggregation and use grace of 10 minutes (due to 
endpoint clock differences and multiple providers).  With evict per-key we hope 
to have large percent of our aggregations available for users less than 2 
minutes from hour end.

 

In the API this can be added as boolean 'eager' parameter to suppress.  
Alternatively, declaratively in streams-props as 
'producer-guaranteed-order-per-key'.

 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, bu