[GitHub] [kafka] xiaocairush commented on pull request #13928: KAFKA-15097: prevent server shutdown when source file not exists

2023-06-28 Thread via GitHub


xiaocairush commented on PR #13928:
URL: https://github.com/apache/kafka/pull/13928#issuecomment-1612416149

   Hi @pgjbz , could u kind please share which thread has deleted the log file? 
I'm a newbie for kafka source code and cannot find the root cause.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-28 Thread via GitHub


jsancio merged PR #13917:
URL: https://github.com/apache/kafka/pull/13917


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-28 Thread via GitHub


jsancio commented on PR #13917:
URL: https://github.com/apache/kafka/pull/13917#issuecomment-1612214700

   > @jsancio Another difference is that now the `outer` exception's stack 
trace will be shown via `WARN` instead of just the exception's message via 
`DEBUG`. I assume that's intentional, but just wanted to call it out. Thanks!
   
   Yes @kirktrue, that was intentional. I would like to see the thread stack 
when the atomic move fails.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] stevenbooke commented on a diff in pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-06-28 Thread via GitHub


stevenbooke commented on code in PR #13842:
URL: https://github.com/apache/kafka/pull/13842#discussion_r1245890763


##
refresh-collaborators.py:
##
@@ -0,0 +1,44 @@
+import os
+from bs4 import BeautifulSoup
+from github import Github
+import yaml
+
+### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ###
+github_token = os.environ.get('GITHUB_TOKEN')
+g = Github(github_token)
+repo = g.get_repo("apache/kafka-site")
+contents = repo.get_contents("committers.html")
+content = contents.decoded_content
+soup = BeautifulSoup(content, "html.parser")
+committer_logins = [login.text for login in soup.find_all('div', 
class_='github_login')]
+
+### GET THE CONTRIBUTORS OF THE apache/kafka REPO ###
+n = 10
+repo = g.get_repo("apache/kafka")
+contributors = repo.get_contributors()
+collaborators = []
+for contributor in contributors:
+if contributor.login not in committer_logins:
+collaborators += [contributor.login]
+refreshed_collaborators = collaborators[:n]
+
+### UPDATE asf.yaml ###
+file_path = ".asf.yaml"
+file = repo.get_contents(file_path)
+yaml_content = yaml.safe_load(file.decoded_content)
+
+# Update 'github_whitelist' list
+github_whitelist = refreshed_collaborators[:10]  # New users to be added
+yaml_content["jenkins"]["github_whitelist"] = github_whitelist
+
+# Update 'collaborators' list
+collaborators = refreshed_collaborators[:10]  # New collaborators to be added
+yaml_content["github"]["collaborators"] = collaborators
+
+# Convert the updated content back to YAML
+updated_yaml = yaml.safe_dump(yaml_content)
+
+# Commit and push the changes
+commit_message = "Update .asf.yaml file with refreshed github_whitelist, and 
collaborators"

Review Comment:
   Will do.



##
refresh-collaborators.py:
##
@@ -0,0 +1,44 @@
+import os
+from bs4 import BeautifulSoup
+from github import Github
+import yaml
+
+### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ###
+github_token = os.environ.get('GITHUB_TOKEN')
+g = Github(github_token)
+repo = g.get_repo("apache/kafka-site")
+contents = repo.get_contents("committers.html")
+content = contents.decoded_content
+soup = BeautifulSoup(content, "html.parser")
+committer_logins = [login.text for login in soup.find_all('div', 
class_='github_login')]
+
+### GET THE CONTRIBUTORS OF THE apache/kafka REPO ###
+n = 10
+repo = g.get_repo("apache/kafka")
+contributors = repo.get_contributors()
+collaborators = []
+for contributor in contributors:
+if contributor.login not in committer_logins:
+collaborators += [contributor.login]
+refreshed_collaborators = collaborators[:n]
+
+### UPDATE asf.yaml ###
+file_path = ".asf.yaml"
+file = repo.get_contents(file_path)
+yaml_content = yaml.safe_load(file.decoded_content)
+
+# Update 'github_whitelist' list
+github_whitelist = refreshed_collaborators[:10]  # New users to be added

Review Comment:
   Will fix that.



##
refresh-collaborators.py:
##
@@ -0,0 +1,44 @@
+import os

Review Comment:
   Wil do.



##
.github/workflows/refresh-collaborators.yaml:
##
@@ -0,0 +1,24 @@
+name: Refresh asf.yaml collaborators every 3 months

Review Comment:
   Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

2023-06-28 Thread via GitHub


junrao commented on code in PR #13898:
URL: https://github.com/apache/kafka/pull/13898#discussion_r1245887623


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##
@@ -117,11 +104,6 @@ else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
  * 
and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
  */
 public void resetPositionsIfNeeded() {
-// Raise exception from previous offset fetch if there is one
-RuntimeException exception = 
cachedListOffsetsException.getAndSet(null);

Review Comment:
   his seems a change in existing logic and not just a refactoring. Is this 
change expected?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##
@@ -198,6 +208,11 @@ void validatePositionsOnMetadataChange() {
 }
 
 Map getOffsetResetTimestamp() {
+// Raise exception from previous offset fetch if there is one
+RuntimeException exception = 
cachedListOffsetsException.getAndSet(null);

Review Comment:
   This seems a change in existing logic and not just a refactoring. Is this 
change expected?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##
@@ -261,6 +276,73 @@ void updateSubscriptionState(Map

[GitHub] [kafka] stevenbooke commented on a diff in pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-06-28 Thread via GitHub


stevenbooke commented on code in PR #13842:
URL: https://github.com/apache/kafka/pull/13842#discussion_r1245887810


##
refresh-collaborators.py:
##
@@ -0,0 +1,44 @@
+import os
+from bs4 import BeautifulSoup
+from github import Github
+import yaml
+
+### GET THE NAMES OF THE KAFKA COMMITTERS FROM THE apache/kafka-site REPO ###
+github_token = os.environ.get('GITHUB_TOKEN')
+g = Github(github_token)
+repo = g.get_repo("apache/kafka-site")
+contents = repo.get_contents("committers.html")
+content = contents.decoded_content
+soup = BeautifulSoup(content, "html.parser")
+committer_logins = [login.text for login in soup.find_all('div', 
class_='github_login')]
+
+### GET THE CONTRIBUTORS OF THE apache/kafka REPO ###
+n = 10
+repo = g.get_repo("apache/kafka")
+contributors = repo.get_contributors()

Review Comment:
   Hello @vvcephei, I understand your concern. I will make changes to account 
for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13446: KAFKA-14837, KAFKA-14842: Ignore groups that do not have offsets for filtered topics in MirrorCheckpointConnector

2023-06-28 Thread via GitHub


C0urante commented on code in PR #13446:
URL: https://github.com/apache/kafka/pull/13446#discussion_r1245885579


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -150,10 +156,31 @@ private void loadInitialConsumerGroups()
 
 List findConsumerGroups()
 throws InterruptedException, ExecutionException {
-return listConsumerGroups().stream()
+List filteredGroups = listConsumerGroups().stream()
 .map(ConsumerGroupListing::groupId)
-.filter(this::shouldReplicate)
+.filter(this::shouldReplicateByGroupFilter)
 .collect(Collectors.toList());
+
+List checkpointGroups = new LinkedList<>();
+List irrelevantGroups = new LinkedList<>();
+
+for (String group : filteredGroups) {
+Set consumedTopics = 
listConsumerGroupOffsets(group).keySet().stream()
+.map(TopicPartition::topic)
+.filter(this::shouldReplicateByTopicFilter)

Review Comment:
   @blacktooth I'll mark this conversation resolved so that users don't get the 
wrong idea about MM2; feel free to unresolve if you believe there's still an 
issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13658: KAFKA-14669: Use the generated docs for MirrorMaker configs in the doc

2023-06-28 Thread via GitHub


C0urante commented on code in PR #13658:
URL: https://github.com/apache/kafka/pull/13658#discussion_r1245804801


##
docs/configuration.html:
##
@@ -267,23 +267,31 @@ 
 
-  3.8 System 
Properties
+
+  3.8 MirrorMaker 
Configs
+  Below is the configuration of MirrorMaker.

Review Comment:
   Nit:
   ```suggestion
 Below is the configuration of the connectors that make up MirrorMaker 2.
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java:
##
@@ -57,7 +59,7 @@ short heartbeatsTopicReplicationFactor() {
 return getShort(HEARTBEATS_TOPIC_REPLICATION_FACTOR);
 }
 
-protected static final ConfigDef CONNECTOR_CONFIG_DEF = new 
ConfigDef(BASE_CONNECTOR_CONFIG_DEF)
+protected static final ConfigDef HEARTBEAT_CONFIG_DEF = new ConfigDef()

Review Comment:
   I think we can get away without introducing a utility method to merge 
`ConfigDefs` by replacing this constant (and others like it) with a method that 
adds the connector-specific configuration properties to an existing `ConfigDef`:
   
   ```java
   private static ConfigDef defineHeartbeatConfig(ConfigDef baseConfig) {
   return baseConfig
   .define(
   EMIT_HEARTBEATS_ENABLED,
   ConfigDef.Type.BOOLEAN,
   EMIT_HEARTBEATS_ENABLED_DEFAULT,
   ConfigDef.Importance.LOW,
   EMIT_HEARTBEATS_ENABLED_DOC)
   // ...
   ```



##
docs/configuration.html:
##
@@ -267,23 +267,31 @@ 
 
-  3.8 System 
Properties
+
+  3.8 MirrorMaker 
Configs
+  Below is the configuration of MirrorMaker.
+  
+  

Review Comment:
   We should note the name of the connector here. It'd also be nice if we gave 
each section (common, source, checkpoint, heartbeat) a subheading, like we do 
for 
[source](https://github.com/apache/kafka/blob/3a246b1abab0cfa8050546f54c987af2ec6cdd4e/docs/configuration.html#L254C121-L254C145)
 and 
[sink](https://github.com/apache/kafka/blob/3a246b1abab0cfa8050546f54c987af2ec6cdd4e/docs/configuration.html#L258)
 connectors, and perhaps a brief (one sentence is fine) description of what 
each connector does and/or a link to other parts of our docs that already 
provide that info.



##
docs/configuration.html:
##
@@ -267,23 +267,31 @@ 
 
-  3.8 System 
Properties
+

Review Comment:
   Nit: unnecessary extra line
   ```suggestion
   ```



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java:
##
@@ -77,7 +79,9 @@ short heartbeatsTopicReplicationFactor() {
 ConfigDef.Importance.LOW,
 HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC);
 
+protected final static ConfigDef CONNECTOR_CONFIG_DEF = new 
ConfigDef(mergeConnectorConfigDef(HEARTBEAT_CONFIG_DEF));

Review Comment:
   With the above suggestion, this can now become:
   ```java
   protected static final ConfigDef CONNECTOR_CONFIG_DEF = 
defineHeartbeatConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF));
   ```



##
docs/configuration.html:
##
@@ -267,23 +267,31 @@ 
 
-  3.8 System 
Properties
+
+  3.8 MirrorMaker 
Configs
+  Below is the configuration of MirrorMaker.
+  

Review Comment:
   We should note that these are common properties that apply to all three 
connectors, right?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java:
##
@@ -77,7 +79,9 @@ short heartbeatsTopicReplicationFactor() {
 ConfigDef.Importance.LOW,
 HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC);
 
+protected final static ConfigDef CONNECTOR_CONFIG_DEF = new 
ConfigDef(mergeConnectorConfigDef(HEARTBEAT_CONFIG_DEF));
+
 public static void main(String[] args) {
-System.out.println(CONNECTOR_CONFIG_DEF.toHtml(4, config -> 
"mirror_heartbeat_" + config));
+System.out.println(HEARTBEAT_CONFIG_DEF.toHtml(4, config -> 
"mirror_heartbeat_" + config));

Review Comment:
   With the above suggestion, this can now become:
   ```java
   System.out.println(defineHeartbeatConfig(new ConfigDef()).toHtml(4, 
config -> "mirror_heartbeat_" + config));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pgjbz opened a new pull request, #13928: KAFKA-15097: prevent server shutdown when source file not exists

2023-06-28 Thread via GitHub


pgjbz opened a new pull request, #13928:
URL: https://github.com/apache/kafka/pull/13928

   catch NoSuchFileException on atomicMoveWithFallback this catch block will 
prevent throws NoSuchFileException and stop kafka server in Kraft Mode
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15078) When fetching offset 0 the KRaft leader should response with SnapshotId

2023-06-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-15078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-15078.

Fix Version/s: 3.6.0
   Resolution: Fixed

> When fetching offset 0 the KRaft leader should response with SnapshotId
> ---
>
> Key: KAFKA-15078
> URL: https://issues.apache.org/jira/browse/KAFKA-15078
> Project: Kafka
>  Issue Type: Improvement
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0
>
>
> With the current implementation if the follower fetches offset 0 and the 
> KRaft leader has a record batch at offset 0, it will always send a FETCH 
> response with records.
> If the KRaft log has generated a snapshot it is always more efficient of the 
> follower fetch the snapshot instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio merged pull request #13845: KAFKA-15078; KRaft leader replys with snapshot for offset 0

2023-06-28 Thread via GitHub


jsancio merged PR #13845:
URL: https://github.com/apache/kafka/pull/13845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


jeffkbkim commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245682672


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -728,6 +794,81 @@ public void replay(
 }
 consumerGroup.removeMember(memberId);
 }
+
+updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());

Review Comment:
   how does `groupsByTopics` (and `groups`) know that the changes made here are 
already committed (and won't be reverted)?
   
   i think i'm confused because in api handling (i.e. consumer group heartbeat) 
once we modify the timeline data structures we generate records to commit the 
offset in the timeline but here we do it in reverse



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -423,6 +456,47 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+/**
+ * Updates the metadata refresh deadline.
+ *
+ * @param deadlineMs The next time in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+public void setMetadataRefreshDeadline(
+long deadlineMs,
+int groupEpoch
+) {
+this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, 
groupEpoch);
+}
+
+/**
+ * Requests a metadata refresh.
+ */
+public void requestMetadataRefresh() {
+this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+}
+
+/**
+ * Checks if a metadata refresh is required. A refresh is required in two 
cases:
+ * 1) The deadline is smaller or equals to the current time;

Review Comment:
   nit: "or equal to"



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -423,6 +456,47 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+/**
+ * Updates the next metadata refresh time.
+ *
+ * @param nextTimeMs The next time in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+public void setNextMetadataRefreshTime(
+long nextTimeMs,
+int groupEpoch
+) {
+this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, 
groupEpoch);
+}
+
+/**
+ * Resets the next metadata refresh.
+ */
+public void resetNextMetadataRefreshTime() {
+this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY;
+}
+
+/**
+ * Checks if a metadata refresh is required. A refresh is required in two 
cases:
+ * 1) The next update time is smaller or equals to the current time;
+ * 2) The group epoch associated with the next update time is smaller than

Review Comment:
   shouldn't it be "associated with the next update time is larger than"?
   
   the "current group epoch" is `groupEpoch()` right



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -728,6 +794,81 @@ public void replay(
 }
 consumerGroup.removeMember(memberId);
 }
+
+updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());
+}
+
+/**
+ * @return The set of groups subscribed to the topic.
+ */
+public Set groupsSubscribedToTopic(String topicName) {
+Set groups = groupsByTopics.get(topicName);

Review Comment:
   any reason we don't use `getOrDefault()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


jeffkbkim commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245712559


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -141,7 +141,7 @@ public static class TimeAndEpoch {
  * after having refreshed the metadata but the write operation failed. In 
this case, the
  * time is not automatically rolled back.
  */
-private TimeAndEpoch nextMetadataRefreshTime = TimeAndEpoch.EMPTY;

Review Comment:
   i noticed this across some places.
   
   can we rework the comments in `hasMetadataExpired()` and in the test 
`testNextMetadataRefreshTime()` along with the name to use deadline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15028) AddPartitionsToTxnManager metrics

2023-06-28 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-15028.

Resolution: Fixed

> AddPartitionsToTxnManager metrics
> -
>
> Key: KAFKA-15028
> URL: https://issues.apache.org/jira/browse/KAFKA-15028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Attachments: latency-cpu.html
>
>
> KIP-890 added metrics for the AddPartitionsToTxnManager
> VerificationTimeMs – number of milliseconds from adding partition info to the 
> manager to the time the response is sent. This will include the round trip to 
> the transaction coordinator if it is called. This will also account for 
> verifications that fail before the coordinator is called.
> VerificationFailureRate – rate of verifications that returned in failure 
> either from the AddPartitionsToTxn response or through errors in the manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245664165


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,33 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   Well, if there is no task directory, there is no checkpoint to process. So 
it's safe to not do anything in this case.
   
   All you'd do by adding more tasks is to later skip on the check 
`checkPointFile.exists()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245664165


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,33 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   Well, if there is no task directory, there is no checkpoint to process. So 
it's safe to not do anything in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-28 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1245629399


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -240,17 +240,18 @@ object RequestChannel extends Logging {
   val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
   val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
   val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-  val fetchMetricNames =
+  val overrideMetricNames =
 if (header.apiKey == ApiKeys.FETCH) {
-  val isFromFollower = body[FetchRequest].isFromFollower
-  Seq(
-if (isFromFollower) RequestMetrics.followFetchMetricName
+  val specifiedMetricName =
+if (body[FetchRequest].isFromFollower) 
RequestMetrics.followFetchMetricName
 else RequestMetrics.consumerFetchMetricName
-  )
+  Seq(specifiedMetricName, header.apiKey.name)
+} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && 
body[AddPartitionsToTxnRequest].allVerifyOnlyRequest) {
+Seq(RequestMetrics.verifyPartitionsInTxnMetricName)

Review Comment:
   It doesn't make sense to have the common metric here, since we want the two 
to be distinct. The fetch metrics are different since we have a unique metric 
for both fetch types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-28 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1245628587


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -240,17 +240,18 @@ object RequestChannel extends Logging {
   val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
   val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
   val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-  val fetchMetricNames =
+  val overrideMetricNames =
 if (header.apiKey == ApiKeys.FETCH) {
-  val isFromFollower = body[FetchRequest].isFromFollower
-  Seq(
-if (isFromFollower) RequestMetrics.followFetchMetricName
+  val specifiedMetricName =
+if (body[FetchRequest].isFromFollower) 
RequestMetrics.followFetchMetricName
 else RequestMetrics.consumerFetchMetricName
-  )
+  Seq(specifiedMetricName, header.apiKey.name)
+} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && 
body[AddPartitionsToTxnRequest].allVerifyOnlyRequest) {
+Seq(RequestMetrics.verifyPartitionsInTxnMetricName)

Review Comment:
   We are not. It is still seen in the else statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-28 Thread via GitHub


artemlivshits commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1245541275


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -240,17 +240,18 @@ object RequestChannel extends Logging {
   val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
   val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
   val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-  val fetchMetricNames =
+  val overrideMetricNames =
 if (header.apiKey == ApiKeys.FETCH) {
-  val isFromFollower = body[FetchRequest].isFromFollower
-  Seq(
-if (isFromFollower) RequestMetrics.followFetchMetricName
+  val specifiedMetricName =
+if (body[FetchRequest].isFromFollower) 
RequestMetrics.followFetchMetricName
 else RequestMetrics.consumerFetchMetricName
-  )
+  Seq(specifiedMetricName, header.apiKey.name)
+} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && 
body[AddPartitionsToTxnRequest].allVerifyOnlyRequest) {
+Seq(RequestMetrics.verifyPartitionsInTxnMetricName)

Review Comment:
   It looks like we previously had a metric for ADD_PARTITIONS_TO_TXN and now 
we don't.  The FETCH metric has a metric that is combined consumer and 
follower, but we also have the common metric.



##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -240,16 +240,17 @@ object RequestChannel extends Logging {
   val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
   val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
   val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-  val fetchMetricNames =
+  val metricNames =
 if (header.apiKey == ApiKeys.FETCH) {
-  val isFromFollower = body[FetchRequest].isFromFollower
-  Seq(
-if (isFromFollower) RequestMetrics.followFetchMetricName
+  val specifiedMetricName =
+if (body[FetchRequest].isFromFollower) 
RequestMetrics.followFetchMetricName
 else RequestMetrics.consumerFetchMetricName
-  )
+  Seq(specifiedMetricName, header.apiKey.name)
+} else if (header.apiKey == ApiKeys.ADD_PARTITIONS_TO_TXN && 
body[AddPartitionsToTxnRequest].verifyOnlyRequest()) {
+Seq(RequestMetrics.verifyPartitionsInTxnMetricName)
+} else {
+  Seq(header.apiKey.name)
 }
-else Seq.empty
-  val metricNames = fetchMetricNames :+ header.apiKey.name

Review Comment:
   Looks like we're removing the ADD_PARTITIONS_TO_TXN metric, which would be a 
break if someone used it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-28 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1245609479


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {

Review Comment:
   Let's file a JIRA if we want to do this in a follow up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-28 Thread via GitHub


vcrfxia commented on PR #13855:
URL: https://github.com/apache/kafka/pull/13855#issuecomment-1611889559

   Hey @wcarlson5 did you mean to push a new commit to this PR since @cadonna 
and I last reviewed? Some of your replies mention having made changes but I 
don't see any. 
   
   Nothing blocking from my side, though I do think that the latest round of 
suggestions (including test coverage improvements and @cadonna 's suggestion 
for avoiding an unnecessary range query) would be good to incorporate either in 
this PR or the next one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-06-28 Thread via GitHub


rreddy-22 commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1611886952

   > @flashmouse Thanks for the PR. It seems that this condition has been 
around for a while now. Did you observe a similar issue with version prior to 
3.5? I wonder what if changes introduced in 3.5 for the rack aware assignment 
made this worse than it used to be. I agree that we need to get to the bottom 
of this.
   
   The rack awareness changes introduce a whole iteration through all the 
potential consumers to check if any consumer has a matching rack and in the 
worst case scenario where no consumer with matching rack is found, the whole 
iteration happens all over again. Previously we would only iterate through the 
list once, so potentially the time could've doubled now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-06-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738231#comment-17738231
 ] 

Matthias J. Sax commented on KAFKA-13973:
-

The GitHub issue for RocksDB was "declined". I did file a follow up ticker for 
Speedb ([https://github.com/speedb-io/speedb/issues/583)] – maybe we get help 
there.

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Priority: Minor
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-28 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r124885


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException {
 
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
-long bytesSkipped = (avail < remaining) ? avail : remaining;
+int bytesSkipped = Math.min(avail, (int) remaining);

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-06-28 Thread via GitHub


jolshan commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1611816013

   Some new test failures on latest build. I will rebuild and investigate to 
see if this is new to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-06-28 Thread via GitHub


flashmouse commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1611815006

   @dajac 
   we haven't seen such many rebalance times before, we before use 
CooperativeSticky with kafka-client 3.0.0, rebalance times increase very slight.
   
   but we are not use kafka-client 3.5 directly because 
CooperativeStickyAssignor  in 3.5 need upgrad server to 3.5 too if need 
rack-aware assign logic and we couldn't upgrade server right now so we move 
AbstractStickyAssignor and CooperativeStickyAssignor logic to our application 
and move the rackInfo to userData. although we have passd all unit test in 
kafka-client after do this change, we still doubt whether this change caused 
rebalance time increase so we're keeping check.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guizmaii commented on a diff in pull request #13914: KAFKA-14972: Support async runtimes in consumer

2023-06-28 Thread via GitHub


guizmaii commented on code in PR #13914:
URL: https://github.com/apache/kafka/pull/13914#discussion_r1245542620


##
clients/src/main/java/org/apache/kafka/clients/consumer/ThreadAccessKey.java:
##
@@ -0,0 +1,8 @@
+package org.apache.kafka.clients.consumer;
+
+/**
+ * A key that can be used to pass access to the Kafka consumer to another 
thread.
+ * Can be obtained via {@link KafkaConsumer#getThreadAccessKey()}.
+ */
+public class ThreadAccessKey {

Review Comment:
   `final`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect

2023-06-28 Thread via GitHub


C0urante commented on code in PR #13915:
URL: https://github.com/apache/kafka/pull/13915#discussion_r1245532972


##
docs/connect.html:
##
@@ -301,7 +301,7 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is resumed. 
Any resources claimed by its tasks are left allocated, which allows the 
connector to begin processing data quickly once it is resumed.
-PUT /connectors/{name}/stop - stop the connector and 
shut down its tasks, deallocating any resources claimed by its tasks. This is 
more efficient from a resource usage standpoint than pausing the connector, but 
can cause it to take longer to begin processing data once resumed.
+PUT /connectors/{name}/stop - stop the connector and 
shut down its tasks, deallocating any resources claimed by its tasks. This is 
more efficient from a resource usage standpoint than pausing the connector, but 
can cause it to take longer to begin processing data once resumed. Note that 
the offsets for a connector can be only modified via the offsets management 
endpoints if it is in the stopped state

Review Comment:
   ```suggestion
   PUT 
/connectors/{name}/stop - stop the connector and shut down its tasks, 
deallocating any resources claimed by its tasks. This is more efficient from a 
resource usage standpoint than pausing the connector, but can cause it to take 
longer to begin processing data once resumed. Note that the offsets for a 
connector can be only modified via the offsets management endpoints if it is in 
the stopped state
   ```



##
docs/connect.html:
##
@@ -313,7 +313,22 @@ REST 
API
 DELETE /connectors/{name} - delete a connector, 
halting all tasks and deleting its configuration
 GET /connectors/{name}/topics - get the set of topics 
that a specific connector is using since the connector was created or since a 
request to reset its set of active topics was issued
 PUT /connectors/{name}/topics/reset - send a request 
to empty the set of active topics of a connector
-GET /connectors/{name}/offsets - get the current 
offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details)
+Offsets management endpoints (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details):
+
+GET /connectors/{name}/offsets - get the 
current offsets for a connector
+DELETE /connectors/{name}/offsets - reset the 
offsets for a connector. The connector must exist and must be in the stopped 
state
+PATCH /connectors/{name}/offsets - alter the 
offsets for a connector. The connector must exist and must be in the stopped 
state. The request body should be a JSON object containing a JSON array 
offsets field, similar to the response body of the GET 
/connectors/{name}/offsets endpoint

Review Comment:
   ```suggestion
   DELETE /connectors/{name}/offsets - reset 
the offsets for a connector. The connector must exist and must be in the 
stopped state (see PUT 
/connectors/{name}/stop)
   PATCH /connectors/{name}/offsets - alter 
the offsets for a connector. The connector must exist and must be in the 
stopped state (see PUT 
/connectors/{name}/stop). The request body should be a JSON object 
containing a JSON array offsets field, similar to the response 
body of the GET /connectors/{name}/offsets endpoint
   ```



##
docs/connect.html:
##
@@ -313,7 +313,13 @@ REST 
API
 DELETE /connectors/{name} - delete a connector, 
halting all tasks and deleting its configuration
 GET /connectors/{name}/topics - get the set of topics 
that a specific connector is using since the connector was created or since a 
request to reset its set of active topics was issued
 PUT /connectors/{name}/topics/reset - send a request 
to empty the set of active topics of a connector
-GET /connectors/{name}/offsets - get the current 
offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details)
+Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details):
+
+GET /connectors/{name}/offsets - get the 
current offsets for a connector
+DELETE 

[GitHub] [kafka] C0urante commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect

2023-06-28 Thread via GitHub


C0urante commented on code in PR #13915:
URL: https://github.com/apache/kafka/pull/13915#discussion_r1245532682


##
docs/connect.html:
##
@@ -313,7 +313,13 @@ REST 
API
 DELETE /connectors/{name} - delete a connector, 
halting all tasks and deleting its configuration
 GET /connectors/{name}/topics - get the set of topics 
that a specific connector is using since the connector was created or since a 
request to reset its set of active topics was issued
 PUT /connectors/{name}/topics/reset - send a request 
to empty the set of active topics of a connector
-GET /connectors/{name}/offsets - get the current 
offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details)
+Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details):
+
+GET /connectors/{name}/offsets - get the 
current offsets for a connector
+DELETE /connectors/{name}/offsets - reset the 
offsets for a connector. The connector must exist and must be in the stopped 
state.

Review Comment:
   It's possible; I'll add some GH suggestions demonstrating how.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-06-28 Thread via GitHub


dajac commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1611790598

   @flashmouse Thanks for the PR. It seems that this condition has been around 
for a while now. Did you observe a similar issue with version prior to 3.5? I 
wonder what if changes introduced in 3.5 for the rack aware assignment made 
this worse than it used to be. I agree that we need to get to the bottom of 
this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-28 Thread via GitHub


jeffkbkim commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1245523659


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {

Review Comment:
   thanks, that would be cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15083) Passing "remote.log.metadata.*" configs into RLMM

2023-06-28 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738216#comment-17738216
 ] 

Satish Duggana commented on KAFKA-15083:


[~showuon] Can you give details on what you tried?

> Passing "remote.log.metadata.*" configs into RLMM
> -
>
> Key: KAFKA-15083
> URL: https://issues.apache.org/jira/browse/KAFKA-15083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Based on the 
> [KIP-405|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs]:
> |_{color:#00}remote.log.metadata.*{color}_|{color:#00}Default RLMM 
> implementation creates producer and consumer instances. Common client 
> propoerties can be configured with `remote.log.metadata.common.client.` 
> prefix.  User can also pass properties specific to 
> {color}{color:#00}producer/consumer with `remote.log.metadata.producer.` 
> and `remote.log.metadata.consumer.` prefixes. These will override properties 
> with `remote.log.metadata.common.client.` prefix.{color}
> {color:#00}Any other properties should be prefixed with 
> "remote.log.metadata." and these will be passed to 
> RemoteLogMetadataManager#configure(Map props).{color}
> {color:#00}For ex: Security configuration to connect to the local broker 
> for the listener name configured are passed with props.{color}|
>  
> This is missed from current implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-28 Thread via GitHub


dajac commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1245517961


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {

Review Comment:
   I see this as an internal change. I was actually thinking about doing 
something like the following.
   
   ```
   {
 "apiKey": "3",
 "type": "key",
 "name": "ConsumerGroupMetadataKey",
 "validVersions": "0",
 "flexibleVersions": "none",
 "fields": [
   { "name": "GroupId", "type": "string", "versions": "0",
 "about": "The group id." }
 ]
   }
   ```
   
   ```
   {
 "apiKey": "3",
 "type": "value",
 "name": "ConsumerGroupMetadataValue",
 "validVersions": "0",
 "flexibleVersions": "0+",
 "fields": [
   { "name": "Epoch", "versions": "0+", "type": "int32",
 "about": "The group epoch." }
 ]
   }
   ```
   
   This would allow us to add rules in the generator. For instance, an api key 
must have a key and a value. This would also allow us to generate something 
like `ApiMessageType` for the types. More importantly, that would remove that 
weird usage of the version as the type in the key.
   
   We could also consider having both the key and the value defined in one 
file. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-06-28 Thread via GitHub


flashmouse commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1611775967

   @rreddy-22  that's what I mean, thank you, could any one merge this if 
possible or help review to verify this pr is useful?
   
   we are now using CooperativeSticky strategy in kafka-client3.5 but meet 
serious problem that it may trigger rebalance with colossal times(complete one 
rebalance about 3sec, then keep trigger new balance within more than 10minutes, 
that's say, at least rebalance 200 times). for now we cannot give much more 
useful information and we want to verify whether that is caused by this problem 
after this pr is checked.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-28 Thread via GitHub


jeffkbkim commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1245495804


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {

Review Comment:
   would that require a version bump or would we not have to since it's not 
actually used anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15133) RequestMetrics MessageConversionsTimeMs count is ticked even when no conversion occurs

2023-06-28 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738205#comment-17738205
 ] 

Edoardo Comar commented on KAFKA-15133:
---

[~rsivaram] - reading KIP 188

I am not sure if the intent of the MessageConversionsTimeMs Histogram was also 
to record the number of time there was no conversion, so that if only say 1% of 
messages required conversions, we'd see that in the percentile distribution

I found that I was comparing the count with the one in BrokerTopicMetrics and 
the mismatch was obvious

> RequestMetrics MessageConversionsTimeMs count is ticked even when no 
> conversion occurs
> --
>
> Key: KAFKA-15133
> URL: https://issues.apache.org/jira/browse/KAFKA-15133
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0, 3.4.1
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Minor
>
> The Histogram 
> {}{color:#00}RequestChannel{color}.{}}}messageConversionsTimeHist}}
> is ticked even when a Produce/Fetch request incurred no conversion,
> because a new entry is added to the historgram distribution, with a 0ms value.
>  
> It's confusing comparing the Histogram
> kafka.network RequestMetrics MessageConversionsTimeMs
> with the Meter
> kafka.server BrokerTopicMetrics ProduceMessageConversionsPerSec
> because for the latter, the metric is ticked only if a conversion actually 
> occurred



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-28 Thread via GitHub


jeffkbkim commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1245489701


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RecordSerdeTest {
+@Test
+public void testSerializeKey() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1

Review Comment:
   nvm, just saw the comment below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #13927: KAFKA-10199: Enable state updater by default

2023-06-28 Thread via GitHub


cadonna opened a new pull request, #13927:
URL: https://github.com/apache/kafka/pull/13927

   Now that the implementation for the state updater is done, we can enable it 
by default.
   
   This PR enables the state updater by default and fixes code that made 
assumptions that are not true when the state updater is enabled (mainly tests).
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-28 Thread via GitHub


jeffkbkim commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1245486249


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RecordSerdeTest {
+@Test
+public void testSerializeKey() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1

Review Comment:
   in general, it looks like we allow different key/value pairs, i.e. 
OffsetCommitKey + GroupMetadataValue.
   
   i guess we would hit an error on runtime when we replay a record with an 
invalid pair. i feel like we should enforce this when serializing. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


cadonna commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245477150


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,33 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   I do not think there is guarantee that `lockedTaskDirectories` contains any 
tasks the client owns. `lockedTaskDirectories` are just the non-empty task 
directories in the state directory when a rebalance starts. However, a task 
directory is created when a task is created, i.e., it is in state `CREATE`. A 
task directory is not deleted when a task is closed, i.e., in state `CLOSED`. 
This might be a correlation and not a thought-out invariant. At least, the 
original code did not rely on this since it used `union(HashSet::new, 
lockedTaskDirectories, tasks.allTaskIds())`.
   I am also somehow reluctant to rely on such -- IMO -- brittle invariant. 
   As an example, in future we could decide to move the creation of the task 
directory to other parts of the code -- like when the task is initialized -- 
which would mean that there is a interval in which the task is in state 
`CREATED` but does not have a task directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15133) RequestMetrics MessageConversionsTimeMs count is ticked even when no conversion occurs

2023-06-28 Thread Edoardo Comar (Jira)
Edoardo Comar created KAFKA-15133:
-

 Summary: RequestMetrics MessageConversionsTimeMs count is ticked 
even when no conversion occurs
 Key: KAFKA-15133
 URL: https://issues.apache.org/jira/browse/KAFKA-15133
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.4.1, 3.5.0
Reporter: Edoardo Comar
Assignee: Edoardo Comar


The Histogram 
{}{color:#00}RequestChannel{color}.{}}}messageConversionsTimeHist}}
is ticked even when a Produce/Fetch request incurred no conversion,
because a new entry is added to the historgram distribution, with a 0ms value.
 
It's confusing comparing the Histogram
kafka.network RequestMetrics MessageConversionsTimeMs
with the Meter
kafka.server BrokerTopicMetrics ProduceMessageConversionsPerSec
because for the latter, the metric is ticked only if a conversion actually 
occurred



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


cadonna commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245477150


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,33 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   I do not think there is guarantee that `lockedTaskDirectories` contains any 
tasks the client owns. `lockedTaskDirectories` are just the non-empty task 
directories in the state directory when a rebalance starts. However, a task 
directory is created when a task is created, i.e., it is in state `CREATE`. A 
task directory is not deleted when a task is closed, i.e., in state `CLOSED`. 
This might be a correlation and not a thought-out invariant. At least, the 
original code did not rely on this since it used `union(HashSet::new, 
lockedTaskDirectories, tasks.allTaskIds())`.
   I am also somehow reluctant to rely on such -- IMO -- brittle invariant. 
   The creation of the task directory can probably be moved to other parts of 
the code like when the task is initialized which would mean that there is a 
interval in which the task is in state `CREATED` but does not have a task 
directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi opened a new pull request, #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown

2023-06-28 Thread via GitHub


hudeqi opened a new pull request, #13926:
URL: https://github.com/apache/kafka/pull/13926

   This pr is used to remove the metrics in GroupMetadataManager when shutdown.
   This pr has passed the corresponding unit test, and it is part of 
[KAFKA-15129](https://issues.apache.org/jira/browse/KAFKA-15129).
   
   A special advantage is that since the metric registered through metricsGroup 
is removed during shutdown, the original "recreateGauge" method (mentioned in 
[KAFKA-5565](https://github.com/apache/kafka/pull/3506)) is no longer needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-28 Thread via GitHub


jolshan merged PR #13798:
URL: https://github.com/apache/kafka/pull/13798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15132) Implement disable & re-enablement for Tiered Storage

2023-06-28 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738189#comment-17738189
 ] 

Divij Vaidya commented on KAFKA-15132:
--

I and [~mehbey] are currently working on drafting a KIP for this. KIP publish 
ETA - 10th July.

> Implement disable & re-enablement for Tiered Storage
> 
>
> Key: KAFKA-15132
> URL: https://issues.apache.org/jira/browse/KAFKA-15132
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: kip
>
> KIP-405 [1] introduces the Tiered Storage feature in Apache Kafka. One of the 
> limitations mentioned in the KIP is inability to re-enable TS on a topic 
> after it has been disabled.
> {quote}Once tier storage is enabled for a topic, it can not be disabled. We 
> will add this feature in future versions. One possible workaround is to 
> create a new topic and copy the data from the desired offset and delete the 
> old topic. 
> {quote}
> This task will propose a new KIP which extends on KIP-405 to describe the 
> behaviour on on disablement and re-enablement of tiering storage for a topic. 
> The solution will apply for both Zk and KRaft variants.
> [1] KIP-405 - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15132) Implement disable & re-enablement for Tiered Storage

2023-06-28 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15132:


 Summary: Implement disable & re-enablement for Tiered Storage
 Key: KAFKA-15132
 URL: https://issues.apache.org/jira/browse/KAFKA-15132
 Project: Kafka
  Issue Type: New Feature
  Components: core
Reporter: Divij Vaidya
Assignee: Divij Vaidya


KIP-405 [1] introduces the Tiered Storage feature in Apache Kafka. One of the 
limitations mentioned in the KIP is inability to re-enable TS on a topic after 
it has been disabled.


{quote}Once tier storage is enabled for a topic, it can not be disabled. We 
will add this feature in future versions. One possible workaround is to create 
a new topic and copy the data from the desired offset and delete the old topic. 
{quote}

This task will propose a new KIP which extends on KIP-405 to describe the 
behaviour on on disablement and re-enablement of tiering storage for a topic. 
The solution will apply for both Zk and KRaft variants.


[1] KIP-405 - 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-28 Thread via GitHub


jolshan commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1611687116

   Thanks for the review  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13902: MINOR: fix flaky ZkMigrationIntegrationTest.testNewAndChangedTopicsIn…

2023-06-28 Thread via GitHub


divijvaidya commented on PR #13902:
URL: https://github.com/apache/kafka/pull/13902#issuecomment-1611682088

   @mumrah Should we backport this for 3.5 branch as well since the code that 
introduced this flakiness exists over there as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13922: [MINOR] remove the currentStream.close() statement causing exit code issue

2023-06-28 Thread via GitHub


divijvaidya merged PR #13922:
URL: https://github.com/apache/kafka/pull/13922


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245427580


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,33 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =

Review Comment:
   Ah, I recommended this change thinking that `lockedTaskDirectories` always 
includes all `ClosedAndCreatedTasks` -- I think it does right? So it should be 
enough to assign this to `lockedTaskDirectories`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


lucasbru commented on code in PR #13925:
URL: https://github.com/apache/kafka/pull/13925#discussion_r1245383102


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1138,35 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set createdAndClosedTasks = new HashSet<>();
+for (final Task task : tasks.values()) {
+if (task.state() != State.CREATED && task.state() != State.CLOSED) 
{
 final Map changelogOffsets = 
task.changelogOffsets();
 if (changelogOffsets.isEmpty()) {
-log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}", id);
+log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}",
+task.id());
 } else {
-taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
changelogOffsets));
+taskOffsetSums.put(task.id(), 
sumOfChangelogOffsets(task.id(), changelogOffsets));
 }
 } else {
-final File checkpointFile = 
stateDirectory.checkpointFileFor(id);
-try {
-if (checkpointFile.exists()) {
-taskOffsetSums.put(id, sumOfChangelogOffsets(id, new 
OffsetCheckpoint(checkpointFile).read()));
-}
-} catch (final IOException e) {
-log.warn(String.format("Exception caught while trying to 
read checkpoint for task %s:", id), e);
+createdAndClosedTasks.add(task.id());

Review Comment:
   nit: if you want to do it with fewer collections, you could inititialize 
`lockedTaskDirectoriesOfNonOwnedTasks` earlier, and just remove directly from 
that set in the `if` branch, instead of adding to `createdAndClosedTasks` in 
the `else` branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-28 Thread via GitHub


divijvaidya commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1611605733

   > so only that one verification per transaction
   
   ah, I had missed this part that it will not be recorded on "every" message 
append. Only for verified cases. I think we should be good to merge this in 
without worrying about the latency impact. I don't suppose a single histogram 
should add much and we also didn't see an impact in the producer perf test that 
you did here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation

2023-06-28 Thread via GitHub


jeqo commented on PR #13923:
URL: https://github.com/apache/kafka/pull/13923#issuecomment-1611590682

   Sure, make sense


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation

2023-06-28 Thread via GitHub


divijvaidya commented on PR #13923:
URL: https://github.com/apache/kafka/pull/13923#issuecomment-1611589151

   Thanks @jeqo. Looks good to me but we will wait for Luke and/or Satish to 
look into this before we merge this one in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13884: MINOR: fix typos for client

2023-06-28 Thread via GitHub


divijvaidya merged PR #13884:
URL: https://github.com/apache/kafka/pull/13884


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #13902: MINOR: fix flaky ZkMigrationIntegrationTest.testNewAndChangedTopicsIn…

2023-06-28 Thread via GitHub


chia7712 merged PR #13902:
URL: https://github.com/apache/kafka/pull/13902


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #13925: KAFKA-10199: Consider tasks in state updater when computing offset sums

2023-06-28 Thread via GitHub


cadonna opened a new pull request, #13925:
URL: https://github.com/apache/kafka/pull/13925

   With the state updater, the task manager needs also to look into the tasks 
owned by the state updater when computing the sum of offsets of the state. This 
sum of offsets is used by the high availability assignor to assign warm-up 
replicas.
   If the task manager does not take into account tasks in the state updater, a 
warm-up replica will never report back that the state for the corresponding 
task has caught up. Consequently, the warm-up replica will never be dismissed 
and probing rebalances will never end..
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation

2023-06-28 Thread via GitHub


jeqo commented on PR #13923:
URL: https://github.com/apache/kafka/pull/13923#issuecomment-1611480623

   @divijvaidya adding the following changes to the KIP: 
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=97554472=363=362


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] joobisb commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart

2023-06-28 Thread via GitHub


joobisb commented on code in PR #13862:
URL: https://github.com/apache/kafka/pull/13862#discussion_r1245268151


##
docs/quickstart.html:
##
@@ -154,9 +154,9 @@ 
 By default, each line you enter will result in a separate event 
being written to the topic.
 
 
-$ 
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 
localhost:9092
-This is my first event
-This is my second event
+$ 
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 
localhost:9092
+$ This is my first event
+$ This is my second event

Review Comment:
   resolved 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-28 Thread via GitHub


lucasbru commented on PR #13876:
URL: https://github.com/apache/kafka/pull/13876#issuecomment-1611471103

   @jolshan Yes, the main reason for wrapping was consistency. However, I'm now 
considering a different kind of consistency - never wrap fatal errors - which 
is what was originally suggested in the KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation

2023-06-28 Thread via GitHub


jeqo commented on code in PR #13923:
URL: https://github.com/apache/kafka/pull/13923#discussion_r1245252224


##
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java:
##
@@ -107,20 +110,22 @@ InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
  * @param endPosition  end position of log segment to be read, 
inclusive.
  * @return input stream of the requested log segment data.
  * @throws RemoteStorageException  if there are any errors while 
fetching the desired segment.
- * @throws RemoteResourceNotFoundException when there are no resources 
associated with the given remoteLogSegmentMetadata.
+ * @throws RemoteResourceNotFoundException the requested log segment is 
not found in the remote storage.
  */
 InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
 int startPosition,
 int endPosition) throws RemoteStorageException;
 
 /**
  * Returns the index for the respective log segment of {@link 
RemoteLogSegmentMetadata}.
+ * 
+ * If the index is not present (e.g. Transaction index may not exist), 
throws {@link RemoteResourceNotFoundException}

Review Comment:
   thanks @divijvaidya, applying this suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245226991


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -141,7 +141,7 @@ public static class TimeAndEpoch {
  * after having refreshed the metadata but the write operation failed. In 
this case, the
  * time is not automatically rolled back.
  */
-private TimeAndEpoch nextMetadataRefreshTime = TimeAndEpoch.EMPTY;

Review Comment:
   I renamed this one. It seems that `deadline` is more appropriate than 
`nextTime`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245214287


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -423,6 +456,47 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+/**
+ * Updates the next metadata refresh time.
+ *
+ * @param nextTimeMs The next time in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+public void setNextMetadataRefreshTime(
+long nextTimeMs,
+int groupEpoch
+) {
+this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, 
groupEpoch);
+}
+
+/**
+ * Resets the next metadata refresh.
+ */
+public void resetNextMetadataRefreshTime() {
+this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY;
+}
+
+/**
+ * Checks if a metadata refresh is required. A refresh is required in two 
cases:
+ * 1) The next update time is smaller or equals to the current time;
+ * 2) The group epoch associated with the next update time is smaller than

Review Comment:
   Right. In this case, the time is set to zero so it is always smaller than 
the current time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245213037


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -423,6 +456,47 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+/**
+ * Updates the next metadata refresh time.
+ *
+ * @param nextTimeMs The next time in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+public void setNextMetadataRefreshTime(
+long nextTimeMs,
+int groupEpoch
+) {
+this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, 
groupEpoch);
+}
+
+/**
+ * Resets the next metadata refresh.

Review Comment:
   Right. Update immediately on the next heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245212300


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -119,6 +131,18 @@ public String toString() {
  */
 private final TimelineHashMap> 
currentPartitionEpoch;
 
+/**
+ * The next metadata refresh time. It consists of a timestamp in 
milliseconds together with
+ * the group epoch at the time of setting it. The metadata refresh time is 
considered as a
+ * soft state (read that it is not stored in a timeline data structure). 
It is like this
+ * because it is not persisted to the log. The group epoch is here to 
ensure that the
+ * next metadata refresh time is invalidated if the group epoch does not 
correspond to
+ * the current group epoch. This can happen if the next metadata refresh 
time is updated
+ * after having refreshed the metadata but the write operation failed. In 
this case, the
+ * time is not automatically rollback.

Review Comment:
   It is actually the other way around. The refresh time is updated immediately 
but it is not rolled back if the write failed. This is the reason why I have 
included the group epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245209953


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or

Review Comment:
   > Is created topics a different method in topicsDelta? Shouldn't we have 
createdTopicIds and we add them? Or is changedTopics accounting for that? Are 
there other changes besides topic creation we can have?
   
   My understanding is that created topics are included in the changed topics 
as well.
   
   > I guess my question then is what is the flow for updating the groups with 
the image? This will just happen on the next heartbeat since we set 
metadataImage to new image?
   
   Right. The idea is to flag all the groups subscribed topics and to let them 
update themselves on the next heartbeat. I did it this way because we can also 
rely on this mechanism to refresh regex based subs every X minutes later on. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi opened a new pull request, #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown

2023-06-28 Thread via GitHub


hudeqi opened a new pull request, #13924:
URL: https://github.com/apache/kafka/pull/13924

   This pr is used to remove the metrics in LogCleanerManager when logCleaner 
is closed.
   This pr has passed the corresponding unit test, and it is part of 
KAFKA-15129.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245157302


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1021,34 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * A new metadata image is available.
+ *
+ * @param newImage  The new metadata image.
+ * @param delta The delta image.
+ */
+public void onNewMetadataImage(MetadataImage newImage, MetadataDelta 
delta) {
+metadataImage = newImage;
+
+// Notify all the groups subscribed to the created, updated or
+// deleted topics.
+Set allGroupIds = new HashSet<>();
+delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+String topicName = topicDelta.name();
+Set groupIds = groupsByTopics.get(topicName);
+if (groupIds != null) allGroupIds.addAll(groupIds);
+});
+delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
+TopicImage topicImage = delta.image().topics().getTopic(topicId);
+Set groupIds = groupsByTopics.get(topicImage.name());
+if (groupIds != null) allGroupIds.addAll(groupIds);

Review Comment:
   Correct. I also simplified this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245156156


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -709,14 +780,16 @@ public void replay(
 String groupId = key.groupId();
 String memberId = key.memberId();
 
+ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, 
value != null);
+Set oldSubscribedTopicNames = new 
HashSet<>(consumerGroup.subscribedTopicNames());
+
 if (value != null) {
-ConsumerGroup consumerGroup = 
getOrMaybeCreateConsumerGroup(groupId, true);
 ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, true);
 consumerGroup.updateMember(new 
ConsumerGroupMember.Builder(oldMember)
 .updateWith(value)
 .build());
+updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());

Review Comment:
   Good call. Moved it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245154086


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -506,32 +555,54 @@ private 
CoordinatorResult consumerGr
 .setClientHost(clientHost)
 .build();
 
+boolean updatedMemberSubscriptions = false;
 if (!updatedMember.equals(member)) {
 records.add(newMemberSubscriptionRecord(groupId, updatedMember));
 
 if 
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
 log.info("[GroupId " + groupId + "] Member " + memberId + " 
updated its subscribed topics to: " +
 updatedMember.subscribedTopicNames());
+updatedMemberSubscriptions = true;
+}
 
-subscriptionMetadata = group.computeSubscriptionMetadata(
-member,
-updatedMember,
-topicsImage
-);
-
-if 
(!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId " + groupId + "] Computed new 
subscription metadata: "
-+ subscriptionMetadata + ".");
-records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
-}
+if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+log.info("[GroupId " + groupId + "] Member " + memberId + " 
updated its subscribed regex to: " +
+updatedMember.subscribedTopicRegex());
+updatedMemberSubscriptions = true;
+}
+}
 
-groupEpoch += 1;
-records.add(newGroupEpochRecord(groupId, groupEpoch));
+long currentTimeMs = time.milliseconds();
+boolean maybeUpdateMetadata = updatedMemberSubscriptions || 
group.refreshMetadataNeeded(currentTimeMs);
+boolean updatedSubscriptionMetadata = false;
+if (maybeUpdateMetadata) {
+subscriptionMetadata = group.computeSubscriptionMetadata(
+member,
+updatedMember,
+metadataImage.topics()
+);
 
-log.info("[GroupId " + groupId + "] Bumped group epoch to " + 
groupEpoch + ".");
+if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+log.info("[GroupId " + groupId + "] Computed new subscription 
metadata: "
++ subscriptionMetadata + ".");
+records.add(newGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
+updatedSubscriptionMetadata = true;
 }
 }
 
+if (updatedMemberSubscriptions || updatedSubscriptionMetadata) {
+groupEpoch += 1;
+records.add(newGroupEpochRecord(groupId, groupEpoch));
+log.info("[GroupId " + groupId + "] Bumped group epoch to " + 
groupEpoch + ".");
+}
+
+if (maybeUpdateMetadata) {

Review Comment:
   Reworked this part. Let me know if it looks better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

2023-06-28 Thread via GitHub


dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245148023


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -88,10 +93,12 @@ public class GroupMetadataManager {
 public static class Builder {
 private LogContext logContext = null;
 private SnapshotRegistry snapshotRegistry = null;
+private Time time = null;
 private List assignors = null;
-private TopicsImage topicsImage = null;

Review Comment:
   We actually need other features from the MetadataImage (e.g. metadata 
version).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-28 Thread via GitHub


dajac commented on PR #13880:
URL: https://github.com/apache/kafka/pull/13880#issuecomment-1611323500

   @jolshan Thanks. I have addressed the remaining comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation

2023-06-28 Thread via GitHub


showuon commented on PR #13923:
URL: https://github.com/apache/kafka/pull/13923#issuecomment-1611315037

   I'll take a look this week. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation

2023-06-28 Thread via GitHub


divijvaidya commented on code in PR #13923:
URL: https://github.com/apache/kafka/pull/13923#discussion_r1245118305


##
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java:
##
@@ -107,20 +110,22 @@ InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
  * @param endPosition  end position of log segment to be read, 
inclusive.
  * @return input stream of the requested log segment data.
  * @throws RemoteStorageException  if there are any errors while 
fetching the desired segment.
- * @throws RemoteResourceNotFoundException when there are no resources 
associated with the given remoteLogSegmentMetadata.
+ * @throws RemoteResourceNotFoundException the requested log segment is 
not found in the remote storage.
  */
 InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
 int startPosition,
 int endPosition) throws RemoteStorageException;
 
 /**
  * Returns the index for the respective log segment of {@link 
RemoteLogSegmentMetadata}.
+ * 
+ * If the index is not present (e.g. Transaction index may not exist), 
throws {@link RemoteResourceNotFoundException}

Review Comment:
   Perhaps, we can be more descriptive here. My suggestion is to add the 
following (free free to change the wording):
   
   `e.g. Transaction index may not exist because segments create prior to 
version 2.8.0 will not have transaction index associated with them.`
   
   `@throws RemoteResourceNotFoundException the requested index is not found in 
the remote storage. The caller of this function are encouraged to re-create the 
indexes from the segment as the suggested way of handling this error.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable

2023-06-28 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738081#comment-17738081
 ] 

Josep Prat edited comment on KAFKA-15105 at 6/28/23 12:03 PM:
--

Hi [~riedelmax] ,

Only maintainers + a subgroup of collaborators can rerun builds in CI, but even 
for them, they can just run them as they are (no more detailed output). And 
sorry, I just realized I copy pasted the wrong ci build link. This is the right 
one: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/]

I'm updating the description of the Jira issue.+


was (Author: jlprat):
Hi [~riedelmax] ,

Only maintainers + a subgroup of collaborators can rerun builds in CI, but even 
for them, they can just run them as they are (no more detailed output). And 
sorry, I just realized I copy pasted the wrong output. This is the right one: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/]

I'm updating the description of the Jira issue.+

> Flaky test 
> FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
> -
>
> Key: KAFKA-15105
> URL: https://issues.apache.org/jira/browse/KAFKA-15105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Josep Prat
>Assignee: Max Riedel
>Priority: Major
>  Labels: flaky-test
>
> Test  
> integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable()
>  became flaky. An example can be found here: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/
> The error might be caused because of a previous kafka cluster used for 
> another test wasn't cleaned up properly before this one run.
>  
> h3. Error Message
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> h3. Stacktrace
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable

2023-06-28 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat updated KAFKA-15105:
---
Description: 
Test  
integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable()
 became flaky. An example can be found here: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/

The error might be caused because of a previous kafka cluster used for another 
test wasn't cleaned up properly before this one run.

 
h3. Error Message
{code:java}
org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' 
already exists.{code}
h3. Stacktrace
{code:java}
org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' 
already exists. {code}

  was:
Test  
integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable()
 became flaky. An example can be found here: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/]

The error might be caused because of a previous kafka cluster used for another 
test wasn't cleaned up properly before this one run.

 
h3. Error Message
{code:java}
org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' 
already exists.{code}
h3. Stacktrace
{code:java}
org.apache.kafka.common.errors.TopicExistsException: Topic '__consumer_offsets' 
already exists. {code}


> Flaky test 
> FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
> -
>
> Key: KAFKA-15105
> URL: https://issues.apache.org/jira/browse/KAFKA-15105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Josep Prat
>Assignee: Max Riedel
>Priority: Major
>  Labels: flaky-test
>
> Test  
> integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable()
>  became flaky. An example can be found here: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/
> The error might be caused because of a previous kafka cluster used for 
> another test wasn't cleaned up properly before this one run.
>  
> h3. Error Message
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> h3. Stacktrace
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable

2023-06-28 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738081#comment-17738081
 ] 

Josep Prat commented on KAFKA-15105:


Hi [~riedelmax] ,

Only maintainers + a subgroup of collaborators can rerun builds in CI, but even 
for them, they can just run them as they are (no more detailed output). And 
sorry, I just realized I copy pasted the wrong output. This is the right one: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable___2/]

I'm updating the description of the Jira issue.+

> Flaky test 
> FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
> -
>
> Key: KAFKA-15105
> URL: https://issues.apache.org/jira/browse/KAFKA-15105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Josep Prat
>Assignee: Max Riedel
>Priority: Major
>  Labels: flaky-test
>
> Test  
> integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable()
>  became flaky. An example can be found here: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/]
> The error might be caused because of a previous kafka cluster used for 
> another test wasn't cleaned up properly before this one run.
>  
> h3. Error Message
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> h3. Stacktrace
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable

2023-06-28 Thread Max Riedel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738077#comment-17738077
 ] 

Max Riedel commented on KAFKA-15105:


Hi [~josep.prat] 
Thanks for giving me the necessary Jira rights. I was able to assign the ticket 
to me now.

So far, all test runs I did on my local environment passed. But I will try the 
option to run until failure and see what I can learn from that.

My question was about the CI. Is it correct that we are not able to rerun 
builds there or get more detailed output? I couldn't find the error message 
"Topic '__consumer_offsets' already exists." in the standard output of the test 
run

> Flaky test 
> FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
> -
>
> Key: KAFKA-15105
> URL: https://issues.apache.org/jira/browse/KAFKA-15105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Josep Prat
>Assignee: Max Riedel
>Priority: Major
>  Labels: flaky-test
>
> Test  
> integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable()
>  became flaky. An example can be found here: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/]
> The error might be caused because of a previous kafka cluster used for 
> another test wasn't cleaned up properly before this one run.
>  
> h3. Error Message
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> h3. Stacktrace
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable

2023-06-28 Thread Max Riedel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Riedel reassigned KAFKA-15105:
--

Assignee: Max Riedel

> Flaky test 
> FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
> -
>
> Key: KAFKA-15105
> URL: https://issues.apache.org/jira/browse/KAFKA-15105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Josep Prat
>Assignee: Max Riedel
>Priority: Major
>  Labels: flaky-test
>
> Test  
> integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable()
>  became flaky. An example can be found here: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/]
> The error might be caused because of a previous kafka cluster used for 
> another test wasn't cleaned up properly before this one run.
>  
> h3. Error Message
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> h3. Stacktrace
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-06-28 Thread David Gammon (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738060#comment-17738060
 ] 

David Gammon commented on KAFKA-15116:
--

Hi [~mjsax], please see my responses below:
 # Message A uses an internal store to store information about the entity.  The 
store knows that there is a pending event that is yet to be committed so it 
blocks until it is committed. The problem happens when Message B (which has a 
processor that uses the store) tries to get information on it's entity. It will 
block and timeout because Message A hasn't been committed.
 # I think our scenario is specifically *during* a rebalance. I've seen code 
that says if the taskManager is rebalancing then do not commit.
 # This is more to do with our store and how long it takes before it times out. 
The timeout then can impact the transaction timeout and producers get fenced 
etc.
 # The fix is to add a check for rebalancing in the while loop in runOnce. This 
checks if a rebalancing is in progress and sets the numIterations to 0 to stop 
processing of messages. When it has rebalanced it sets numIterations back to 1.
 # Again I think we are talking about *during* a rebalance rather than before.

Thanks,

David

 

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] joobisb commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart

2023-06-28 Thread via GitHub


joobisb commented on code in PR #13862:
URL: https://github.com/apache/kafka/pull/13862#discussion_r1245058126


##
docs/quickstart.html:
##
@@ -154,9 +154,9 @@ 
 By default, each line you enter will result in a separate event 
being written to the topic.
 
 
-$ 
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 
localhost:9092
-This is my first event
-This is my second event
+$ 
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 
localhost:9092
+$ This is my first event
+$ This is my second event

Review Comment:
   will update these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] joobisb commented on a diff in pull request #13862: KAFKA-15050: format the prompts in the quickstart

2023-06-28 Thread via GitHub


joobisb commented on code in PR #13862:
URL: https://github.com/apache/kafka/pull/13862#discussion_r1245057900


##
docs/quickstart.html:
##
@@ -32,7 +32,7 @@ 
 the latest Kafka release and extract it:
 
 
-$ tar -xzf 
kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+$ tar -xzf 
kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz

Review Comment:
   https://github.com/apache/kafka/assets/13068144/e4285029-5a1c-4002-8ed6-f8a95c97e2a0;>
   
   If I remove the `language-bash` class from `` it will lose the 
associated properties as shown in the first image above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15131) Improve RemoteStorageManager exception handling documentation

2023-06-28 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-15131:
-
Summary: Improve RemoteStorageManager exception handling documentation  
(was: Improve RemoteStorageManager exception handling)

> Improve RemoteStorageManager exception handling documentation
> -
>
> Key: KAFKA-15131
> URL: https://issues.apache.org/jira/browse/KAFKA-15131
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: tiered-storage
>
> As discussed here[1], RemoteStorageManager javadocs requires clarification 
> regarding error handling:
>  * Remove ambiguity on `RemoteResourceNotFoundException` description
>  * Describe when `RemoteResourceNotFoundException` can/should be thrown
>  * Describe expectations for idempotent operations when copying/deleting
>  
> [1] 
> https://issues.apache.org/jira/browse/KAFKA-7739?focusedCommentId=17720936=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17720936



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeqo commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-28 Thread via GitHub


jeqo commented on PR #13828:
URL: https://github.com/apache/kafka/pull/13828#issuecomment-1611155345

   @showuon I'm trying to test this, but TBRLMM is still complaining about 
missing bootstrap.servers, even when listener name is provided:
   ```
   kafka-ts  | [2023-06-28 10:19:04,131] INFO Initializing the resources. 
(org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)
   kafka-ts  | [2023-06-28 10:19:04,141] ERROR Uncaught exception in thread 
'RLMMInitializationThread': (org.apache.kafka.common.utils.KafkaThread)
   kafka-ts  | org.apache.kafka.common.config.ConfigException: Missing required 
configuration "bootstrap.servers" which has no default value.
   kafka-ts  | at 
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:496)
   kafka-ts  | at 
org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:486)
   kafka-ts  | at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:112)
   kafka-ts  | at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:145)
   kafka-ts  | at 
org.apache.kafka.clients.admin.AdminClientConfig.(AdminClientConfig.java:244)
   kafka-ts  | at 
org.apache.kafka.clients.admin.Admin.create(Admin.java:144)
   kafka-ts  | at 
org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:49)
   kafka-ts  | at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.initializeResources(TopicBasedRemoteLogMetadataManager.java:366)
   kafka-ts  | at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$configure$1(TopicBasedRemoteLogMetadataManager.java:352)
   kafka-ts  | at java.base/java.lang.Thread.run(Thread.java:829)
   
   ```
   
   Looking at the code, I can see listener name being passed,
   
   ```
   kafka-ts  | remote.log.metadata.manager.class.name = 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
   kafka-ts  | remote.log.metadata.manager.class.path = null
   kafka-ts  | remote.log.metadata.manager.impl.prefix = rlmm.config.
   kafka-ts  | remote.log.metadata.manager.listener.name = BROKER
   
   ```
   
   but when initializing the resources, properties without the right prefix are 
ignored: 
https://github.com/apache/kafka/blob/f32ebeab17ce574660669873402a7f40927d0492/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java#L136-L159
   
   Let me know if I'm reading this properly to create an issue, otherwise I may 
be missing something. Many thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14993) Improve TransactionIndex instance handling while copying to and fetching from RSM.

2023-06-28 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738035#comment-17738035
 ] 

Kamal Chandraprakash commented on KAFKA-14993:
--

[~jeqo] I haven't started to work on this but will raise the patch soon. Will 
add you as a reviewer for the patch. Thanks!

> Improve TransactionIndex instance handling while copying to and fetching from 
> RSM.
> --
>
> Key: KAFKA-14993
> URL: https://issues.apache.org/jira/browse/KAFKA-14993
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> RSM should throw a ResourceNotFoundException if it does not have 
> TransactionIndex. Currently, it expects an empty InputStream and creates an 
> unnecessary file in the cache. This can be avoided by catching 
> ResourceNotFoundException and not creating an instance. There are minor 
> cleanups needed in RemoteIndexCache and other TransactionIndex usages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2023-06-28 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
[~yash.mayya] )
 # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#00875a}TopologyTest{color} (owner: Christo)
 # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#00875a}RocksDBStoreTest{color} (owner: Christo)
 # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
 # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo)
 # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo)
 # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo)
 # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo)
 # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo)
 # {color:#00875a}AssignmentTestUtils{color} (owner: Christo)
 # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) (takeover: 
Christo)
 # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew)
 # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in 
https://issues.apache.org/jira/browse/KAFKA-12947)
 # {color:#00875a}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: 
[~shekharrajak])
 # {color:#00875a}TimeOrderedWindowStoreTest{color} (owners: [~shekharrajak]) 
[https://github.com/apache/kafka/pull/12777] 

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875a}Merged{color}
 # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] )
 # {color:#00875a}RootResourceTest{color} (connect) 

[jira] [Commented] (KAFKA-14993) Improve TransactionIndex instance handling while copying to and fetching from RSM.

2023-06-28 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738033#comment-17738033
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-14993:
--

[~ckamal] just checking if you have done any progress on this one already. If 
still open, I'd like to help contributing this fix. 
Let me now, cheers!

> Improve TransactionIndex instance handling while copying to and fetching from 
> RSM.
> --
>
> Key: KAFKA-14993
> URL: https://issues.apache.org/jira/browse/KAFKA-14993
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> RSM should throw a ResourceNotFoundException if it does not have 
> TransactionIndex. Currently, it expects an empty InputStream and creates an 
> unnecessary file in the cache. This can be avoided by catching 
> ResourceNotFoundException and not creating an instance. There are minor 
> cleanups needed in RemoteIndexCache and other TransactionIndex usages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeqo opened a new pull request, #13923: KAFKA-15131: Improve RemoteStorageManager exception handling

2023-06-28 Thread via GitHub


jeqo opened a new pull request, #13923:
URL: https://github.com/apache/kafka/pull/13923

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15131) Improve RemoteStorageManager exception handling

2023-06-28 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-15131:


 Summary: Improve RemoteStorageManager exception handling
 Key: KAFKA-15131
 URL: https://issues.apache.org/jira/browse/KAFKA-15131
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jorge Esteban Quilcate Otoya
Assignee: Jorge Esteban Quilcate Otoya


As discussed here[1], RemoteStorageManager javadocs requires clarification 
regarding error handling:
 * Remove ambiguity on `RemoteResourceNotFoundException` description
 * Describe when `RemoteResourceNotFoundException` can/should be thrown
 * Describe expectations for idempotent operations when copying/deleting

 

[1] 
https://issues.apache.org/jira/browse/KAFKA-7739?focusedCommentId=17720936=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17720936



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15130) Delete remote segments when delete a topic

2023-06-28 Thread Lan Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lan Ding updated KAFKA-15130:
-
Description: When tiered storage is enabled and 
{{delete.topic.enable=true}} , deleting a topic should also delete the 
corresponding segments of that topic on the remote system, and cancel the 
RLMTask for that topic.  (was: When tired storage is enabled and 
{{delete.topic.enable=true}} , deleting a topic should also delete the 
corresponding segments of that topic on the remote system, and cancel the 
RLMTask for that topic.)

> Delete remote segments when delete a topic
> --
>
> Key: KAFKA-15130
> URL: https://issues.apache.org/jira/browse/KAFKA-15130
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.4.0, 3.5.0
>Reporter: Lan Ding
>Assignee: Lan Ding
>Priority: Major
>
> When tiered storage is enabled and {{delete.topic.enable=true}} , deleting a 
> topic should also delete the corresponding segments of that topic on the 
> remote system, and cancel the RLMTask for that topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-06-28 Thread Erik van Oosten (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated KAFKA-14972:

Description: 
KafkaConsumer contains a check that rejects nested invocations from different 
threads (method {{{}acquire{}}}). For users that use an async runtime, this is 
an almost impossible requirement. Examples of async runtimes that are affected 
are Kotlin co-routines (see KAFKA-7143) and Zio.

It should be possible for a thread to pass on its capability to access the 
consumer to another thread. See 
[KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and 
[https://github.com/apache/kafka/pull/13914] for an implementation.

  was:
KafkaConsumer contains a check that rejects nested invocations from different 
threads (method {{{}acquire{}}}). For users that use an async runtime, this is 
an almost impossible requirement. Examples of async runtimes that are affected 
are Kotlin co-routines (see KAFKA-7143) and Zio.

It should be possible for a thread to pass on its capability to access the 
consumer to another thread. See KIP-944 for a proposal and


> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> It should be possible for a thread to pass on its capability to access the 
> consumer to another thread. See 
> [KIP-944|https://cwiki.apache.org/confluence/x/chw0Dw] for a proposal and 
> [https://github.com/apache/kafka/pull/13914] for an implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-06-28 Thread Erik van Oosten (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik van Oosten updated KAFKA-14972:

Description: 
KafkaConsumer contains a check that rejects nested invocations from different 
threads (method {{{}acquire{}}}). For users that use an async runtime, this is 
an almost impossible requirement. Examples of async runtimes that are affected 
are Kotlin co-routines (see KAFKA-7143) and Zio.

It should be possible for a thread to pass on its capability to access the 
consumer to another thread. See KIP-944 for a proposal and

  was:
KafkaConsumer contains a check that rejects nested invocations from different 
threads (method {{{}acquire{}}}). For users that use an async runtime, this is 
an almost impossible requirement. Examples of async runtimes that are affected 
are Kotlin co-routines (see KAFKA-7143) and Zio.

We propose to replace the thread-id check with an access-id that is stored on a 
thread-local variable. Existing programs will not be affected. Developers that 
work in an async runtime can pick up the access-id and set it on the 
thread-local variable in a thread of their choosing.

Every time a callback is invoked a new access-id is generated. When the 
callback completes, the previous access-id is restored.

This proposal does not make it impossible to use the client incorrectly. 
However, we think it strikes a good balance between making correct usage from 
an async runtime possible while making incorrect usage difficult.

Alternatives considered:
 # Configuration that switches off the check completely.


> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> It should be possible for a thread to pass on its capability to access the 
> consumer to another thread. See KIP-944 for a proposal and



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-06-28 Thread Erik van Oosten (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738014#comment-17738014
 ] 

Erik van Oosten commented on KAFKA-14972:
-

KIP-944 https://cwiki.apache.org/confluence/x/chw0Dw

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>  Labels: needs-kip
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> We propose to replace the thread-id check with an access-id that is stored on 
> a thread-local variable. Existing programs will not be affected. Developers 
> that work in an async runtime can pick up the access-id and set it on the 
> thread-local variable in a thread of their choosing.
> Every time a callback is invoked a new access-id is generated. When the 
> callback completes, the previous access-id is restored.
> This proposal does not make it impossible to use the client incorrectly. 
> However, we think it strikes a good balance between making correct usage from 
> an async runtime possible while making incorrect usage difficult.
> Alternatives considered:
>  # Configuration that switches off the check completely.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] erikvanoosten commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer

2023-06-28 Thread via GitHub


erikvanoosten commented on PR #13914:
URL: https://github.com/apache/kafka/pull/13914#issuecomment-1610983180

   KIP-944 has been created: https://cwiki.apache.org/confluence/x/chw0Dw


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15130) Delete remote segments when delete a topic

2023-06-28 Thread Lan Ding (Jira)
Lan Ding created KAFKA-15130:


 Summary: Delete remote segments when delete a topic
 Key: KAFKA-15130
 URL: https://issues.apache.org/jira/browse/KAFKA-15130
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 3.5.0, 3.4.0
Reporter: Lan Ding
Assignee: Lan Ding


When tired storage is enabled and {{delete.topic.enable=true}} , deleting a 
topic should also delete the corresponding segments of that topic on the remote 
system, and cancel the RLMTask for that topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge commented on pull request #13760: KAFKA-8982: Add retry of fetching metadata to Admin.deleteRecords

2023-06-28 Thread via GitHub


tinaselenge commented on PR #13760:
URL: https://github.com/apache/kafka/pull/13760#issuecomment-1610960042

   Thank you @showuon  for the feedback. I think I have addressed the comments. 
Please let me know there is anything I missed or doesn't look right. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15129) Clean up all metrics that were forgotten to be closed

2023-06-28 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-15129:
---
Description: 
In the current kafka code, there are still many module metrics that are 
forgotten to be closed when they stop, although some of them have been fixed, 
such as kafka-14866 and kafka-14868. et.
Here I will find all the metrics that are forgotten and closed in the current 
version, and submit them according to the subtasks in order to fix them.

> Clean up all metrics that were forgotten to be closed
> -
>
> Key: KAFKA-15129
> URL: https://issues.apache.org/jira/browse/KAFKA-15129
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core, log
>Affects Versions: 3.5.0
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> In the current kafka code, there are still many module metrics that are 
> forgotten to be closed when they stop, although some of them have been fixed, 
> such as kafka-14866 and kafka-14868. et.
> Here I will find all the metrics that are forgotten and closed in the current 
> version, and submit them according to the subtasks in order to fix them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15129) Clean up all metrics that were forgotten to be closed

2023-06-28 Thread hudeqi (Jira)
hudeqi created KAFKA-15129:
--

 Summary: Clean up all metrics that were forgotten to be closed
 Key: KAFKA-15129
 URL: https://issues.apache.org/jira/browse/KAFKA-15129
 Project: Kafka
  Issue Type: Improvement
  Components: controller, core, log
Affects Versions: 3.5.0
Reporter: hudeqi
Assignee: hudeqi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-28 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1244739834


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +77,59 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (useBuffer) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+}
 }
 
 @Override
 public void process(final Record record) {
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+updateObservedStreamTime(record.timestamp());
+if (maybeDropRecord(record)) {
+return;
+}
+
+if (!useBuffer) {
+doJoin(record);
+} else {
+if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
+doJoin(record);
+}
+buffer.get().evictWhile(() -> true, this::emit);

Review Comment:
   Sure that is just fine with me. I'll add a test adding records out of the 
grace period to



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
 }
 }
 
+
+private void makeJoin(final Duration grace) {
+final KStream stream;
+final KTable table;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+builder = new StreamsBuilder();
+
+final Consumed consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+stream = builder.stream(streamTopic, consumed);
+table = builder.table("tableTopic2", consumed, Materialized.as(
+Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5;
+stream.join(table,
+MockValueJoiner.TOSTRING_JOINER,
+Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+).process(supplier);
+final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+driver = new TopologyTestDriver(builder.build(), props);
+inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+inputTableTopic = driver.createInputTopic("tableTopic2", new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+
+processor = supplier.theCapturedProcessor();
+}
+
+@Test
+public void shouldFailIfTableIsNotVersioned() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KTable tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+
+final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+() -> streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one"));
+assertThat(
+exception.getMessage(),
+is("KTable must be versioned to use a grace period in a stream 
table join.")
+);
+}
+
+@Test
+public void shouldDelayJoinByGracePeriod() {
+makeJoin(Duration.ofMillis(2));
+
+// push four items to the table. this should not produce any item.
+pushToTableNonRandom(4, "Y");
+processor.checkAndClearProcessResult(EMPTY);
+
+// push all four items to the primary stream. this should produce two 
items.
+pushToStream(4, "X");
+processor.checkAndClearProcessResult(
+new KeyValueTimestamp<>(0, "X0+Y0", 0),
+new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+// push all items to the table. this should not produce any item
+pushToTableNonRandom(4, "YY");
+

[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

2023-06-28 Thread via GitHub


wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1244730962


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##
@@ -56,10 +77,59 @@ public void init(final ProcessorContext context) {
 final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 valueGetter.init(context);
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+if (useBuffer) {
+if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+}
+
+buffer.get().setSerdesIfNull(new SerdeGetter(context));
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+}
 }
 
 @Override
 public void process(final Record record) {
+internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+updateObservedStreamTime(record.timestamp());
+if (maybeDropRecord(record)) {
+return;
+}
+
+if (!useBuffer) {
+doJoin(record);
+} else {
+if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
+doJoin(record);
+}
+buffer.get().evictWhile(() -> true, this::emit);
+}
+}
+
+private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) 
{
+final Record record = new Record<>(toEmit.key(), 
toEmit.value(), toEmit.recordContext().timestamp())
+.withHeaders(toEmit.recordContext().headers());
+internalProcessorContext.setRecordContext(toEmit.recordContext());

Review Comment:
   certainly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >