[jira] [Commented] (KAFKA-15736) KRaft support in PlaintextConsumerTest

2023-11-18 Thread Allen Kang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787464#comment-17787464
 ] 

Allen Kang commented on KAFKA-15736:


Can I pick up this?

It looks great to me for a first-time contribution. I'll try other simillar 
tickets (for KRaft support in test codes) too if it's okay.

> KRaft support in PlaintextConsumerTest
> --
>
> Key: KAFKA-15736
> URL: https://issues.apache.org/jira/browse/KAFKA-15736
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in PlaintextConsumerTest in 
> core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala need to 
> be updated to support KRaft
> 49 : def testHeaders(): Unit = {
> 136 : def testDeprecatedPollBlocksForAssignment(): Unit = {
> 144 : def testHeadersSerializerDeserializer(): Unit = {
> 153 : def testMaxPollRecords(): Unit = {
> 169 : def testMaxPollIntervalMs(): Unit = {
> 194 : def testMaxPollIntervalMsDelayInRevocation(): Unit = {
> 234 : def testMaxPollIntervalMsDelayInAssignment(): Unit = {
> 258 : def testAutoCommitOnClose(): Unit = {
> 281 : def testAutoCommitOnCloseAfterWakeup(): Unit = {
> 308 : def testAutoOffsetReset(): Unit = {
> 319 : def testGroupConsumption(): Unit = {
> 339 : def testPatternSubscription(): Unit = {
> 396 : def testSubsequentPatternSubscription(): Unit = {
> 447 : def testPatternUnsubscription(): Unit = {
> 473 : def testCommitMetadata(): Unit = {
> 494 : def testAsyncCommit(): Unit = {
> 513 : def testExpandingTopicSubscriptions(): Unit = {
> 527 : def testShrinkingTopicSubscriptions(): Unit = {
> 541 : def testPartitionsFor(): Unit = {
> 551 : def testPartitionsForAutoCreate(): Unit = {
> 560 : def testPartitionsForInvalidTopic(): Unit = {
> 566 : def testSeek(): Unit = {
> 621 : def testPositionAndCommit(): Unit = {
> 653 : def testPartitionPauseAndResume(): Unit = {
> 671 : def testFetchInvalidOffset(): Unit = {
> 696 : def testFetchOutOfRangeOffsetResetConfigEarliest(): Unit = {
> 717 : def testFetchOutOfRangeOffsetResetConfigLatest(): Unit = {
> 743 : def testFetchRecordLargerThanFetchMaxBytes(): Unit = {
> 772 : def testFetchHonoursFetchSizeIfLargeRecordNotFirst(): Unit = {
> 804 : def testFetchHonoursMaxPartitionFetchBytesIfLargeRecordNotFirst(): Unit 
> = {
> 811 : def testFetchRecordLargerThanMaxPartitionFetchBytes(): Unit = {
> 819 : def testLowMaxFetchSizeForRequestAndPartition(): Unit = {
> 867 : def testRoundRobinAssignment(): Unit = {
> 903 : def testMultiConsumerRoundRobinAssignor(): Unit = {
> 940 : def testMultiConsumerStickyAssignor(): Unit = {
> 986 : def testMultiConsumerDefaultAssignor(): Unit = {
> 1024 : def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
> 1109 : def testMultiConsumerDefaultAssignorAndVerifyAssignment(): Unit = {
> 1141 : def testMultiConsumerSessionTimeoutOnStopPolling(): Unit = {
> 1146 : def testMultiConsumerSessionTimeoutOnClose(): Unit = {
> 1151 : def testInterceptors(): Unit = {
> 1210 : def testAutoCommitIntercept(): Unit = {
> 1260 : def testInterceptorsWithWrongKeyValue(): Unit = {
> 1286 : def testConsumeMessagesWithCreateTime(): Unit = {
> 1303 : def testConsumeMessagesWithLogAppendTime(): Unit = {
> 1331 : def testListTopics(): Unit = {
> 1351 : def testUnsubscribeTopic(): Unit = {
> 1367 : def testPauseStateNotPreservedByRebalance(): Unit = {
> 1388 : def testCommitSpecifiedOffsets(): Unit = {
> 1415 : def testAutoCommitOnRebalance(): Unit = {
> 1454 : def testPerPartitionLeadMetricsCleanUpWithSubscribe(): Unit = {
> 1493 : def testPerPartitionLagMetricsCleanUpWithSubscribe(): Unit = {
> 1533 : def testPerPartitionLeadMetricsCleanUpWithAssign(): Unit = {
> 1562 : def testPerPartitionLagMetricsCleanUpWithAssign(): Unit = {
> 1593 : def testPerPartitionLagMetricsWhenReadCommitted(): Unit = {
> 1616 : def testPerPartitionLeadWithMaxPollRecords(): Unit = {
> 1638 : def testPerPartitionLagWithMaxPollRecords(): Unit = {
> 1661 : def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = {
> 1809 : def testConsumingWithNullGroupId(): Unit = {
> 1874 : def testConsumingWithEmptyGroupId(): Unit = {
> 1923 : def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = 
> {
> Scanned 1951 lines. Found 0 KRaft tests out of 61 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]

2023-11-18 Thread via GitHub


divijvaidya commented on code in PR #14794:
URL: https://github.com/apache/kafka/pull/14794#discussion_r1398097515


##
core/src/test/java/kafka/server/AssignmentsManagerTest.java:
##
@@ -72,6 +75,29 @@ void tearDown() throws InterruptedException {
 manager.close();
 }
 
+AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData 
request) {

Review Comment:
   I was wondering if we can find a way to overriding equals() for 
AssignReplicasToDirsRequestData with this deep check. I understand that it is 
auto generated but wanted to check the ideas we have tried in that regard ?
   
   I am concerned about this because if we add a new field to this data 
structure in future, it's quite likely to forget to add it here in the setters.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2023-11-18 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787499#comment-17787499
 ] 

Phuc Hong Tran commented on KAFKA-15341:


[~showuon] [~satish.duggana] I’m trying to add remote storage enable status as 
a new field to BrokerRegistrationRequestData, however there is this “_version” 
variable (I assumed it is api version or the data version) that is used when 
read or write the registration data to from or to a byte array. Since I’m 
adding a new field, I assume that I need to increase the version number, do you 
guys know how that _version number is determined? Thanks

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14509 [WIP] [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-11-18 Thread via GitHub


riedelmax commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1398236262


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -8660,6 +8665,68 @@ public void testListGroups() {
 assertEquals(expectAllGroupMap, actualAllGroupMap);
 }
 
+@Test
+public void testConsumerGroupDescribeNoErrors() {
+String consumerGroupId = "consumerGroupId";
+int epoch = 10;
+String memberId = Uuid.randomUuid().toString();
+String topicName = "topicName";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 
epoch))
+.build();
+
+ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setSubscribedTopicNames(Collections.singletonList(topicName));
+context.replay(RecordHelpers.newMemberSubscriptionRecord(
+consumerGroupId,
+memberBuilder.build()
+));
+context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 
epoch + 1));
+
+List actual = 
context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId));
+ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup();
+describedGroup.setGroupEpoch(epoch + 1);
+describedGroup.setGroupId(consumerGroupId);
+
describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember()));
+describedGroup.setAssignorName(null);
+describedGroup.setGroupState("assigning");
+List expected = 
Collections.singletonList(
+describedGroup
+);
+
+assertEquals(expected, actual);
+}

Review Comment:
   I finally managed to implement this. I struggled a little bit on how to add 
the group correctly without commiting the offset, but I think I got it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509 [WIP] [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-11-18 Thread via GitHub


riedelmax commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1398242438


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -545,6 +547,32 @@ public String currentAssignmentSummary() {
 ')';
 }
 
+public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(Assignment targetAssignment) {
+return new ConsumerGroupDescribeResponseData.Member()
+.setMemberEpoch(memberEpoch)
+.setMemberId(Uuid.fromString(memberId))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509 [WIP] [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-11-18 Thread via GitHub


riedelmax commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1398242511


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -445,6 +446,42 @@ public List 
listGroups(List statesFi
 return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
 }
 
+
+public List 
consumerGroupDescribe(
+List groupIds,
+long committedOffset
+) {
+List response = new 
ArrayList<>();
+
+for (String groupId: groupIds) {
+Group group = groups.get(groupId, committedOffset);
+
+ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = 
new ConsumerGroupDescribeResponseData.DescribedGroup()
+.setGroupId(groupId);
+
+if (group == null || !CONSUMER.equals(group.type())) {
+// We don't support upgrading/downgrading between protocols at 
the moment so
+// we set an error if a group exists with the wrong type.
+
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());

Review Comment:
   good catch.. already changed that in a previous commit, but I will also 
remove the message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509 [WIP] [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-11-18 Thread via GitHub


riedelmax commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1398242511


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -445,6 +446,42 @@ public List 
listGroups(List statesFi
 return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
 }
 
+
+public List 
consumerGroupDescribe(
+List groupIds,
+long committedOffset
+) {
+List response = new 
ArrayList<>();
+
+for (String groupId: groupIds) {
+Group group = groups.get(groupId, committedOffset);
+
+ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = 
new ConsumerGroupDescribeResponseData.DescribedGroup()
+.setGroupId(groupId);
+
+if (group == null || !CONSUMER.equals(group.type())) {
+// We don't support upgrading/downgrading between protocols at 
the moment so
+// we set an error if a group exists with the wrong type.
+
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());

Review Comment:
   good catch.. already changed that in a previous commit.
   why would we remove the message instead of 
Errors.GROUP_ID_NOT_FOUND.message()



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -445,6 +446,42 @@ public List 
listGroups(List statesFi
 return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
 }
 
+
+public List 
consumerGroupDescribe(
+List groupIds,
+long committedOffset
+) {
+List response = new 
ArrayList<>();
+
+for (String groupId: groupIds) {
+Group group = groups.get(groupId, committedOffset);
+
+ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = 
new ConsumerGroupDescribeResponseData.DescribedGroup()
+.setGroupId(groupId);
+
+if (group == null || !CONSUMER.equals(group.type())) {
+// We don't support upgrading/downgrading between protocols at 
the moment so
+// we set an error if a group exists with the wrong type.
+
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());

Review Comment:
   good catch.. already changed that in a previous commit.
   why would we remove the message instead of 
Errors.GROUP_ID_NOT_FOUND.message()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15776: Use the FETCH request timeout as the delay timeout for DelayedRemoteFetch [kafka]

2023-11-18 Thread via GitHub


divijvaidya commented on PR #14778:
URL: https://github.com/apache/kafka/pull/14778#issuecomment-1817593722

   Could you please help me understand how this change works with 
`fetch.max.wait.ms` from a user perspective i.e. what happens when we are 
retrieving data from both local & remote in a single fetch call?
   
   Also, wouldn't this change user clients? Asking because prior to this change 
users were expecting a guaranteed response within `fetch.max.wait.ms` = 500ms 
but now they might not receive a response until 40s `request.timeout.ms`. If 
the user has configured their application timeouts to according to 
`fetch.max.wait.ms`, this change will break my application. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-18 Thread via GitHub


CalvinConfluent commented on PR #14612:
URL: https://github.com/apache/kafka/pull/14612#issuecomment-1817598315

   - Updated the API schema with the Cursor. It is needed both in request and 
response.
   - Removed the RequestLimitReached error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Do not check whether updating tasks exist in the waiting loop [kafka]

2023-11-18 Thread via GitHub


lucasbru merged PR #14791:
URL: https://github.com/apache/kafka/pull/14791


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-11-18 Thread via GitHub


ocadaruma commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1398265953


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -308,7 +308,14 @@ public void truncateFromEnd(long endOffset) {
 if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
 List removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
 
-flush();
+// We intentionally don't force flushing change to the device 
here because:
+// - To avoid fsync latency
+//   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
+//   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
+// then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
+// - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
+//   another truncateFromEnd call on log loading procedure so 
it won't be a problem
+flush(false);

Review Comment:
   I take another look at the code and found that flushing to the file (without 
fsync) is necessary.
   
   The point here is if there's any code path that reloads the leader-epoch 
cache from the file.
   I found it's possible, so not flushing could be a problem in below scenario
   - (1) AlterReplicaDir is initiated
   - (2) truncation happens on futureLog
   * LeaderEpochFileCache.truncateFromEnd is called, but it isn't flushed 
to the file
   - (3) future log caught up and the 
[renameDir](https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681)
 is called
   * This will reload the leader-epoch cache from the file, which is stale
   * Then wrong leader-epoch may be returned (e.g. for list-offsets request)
   
   So we still should flush to the file even without fsync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]

2023-11-18 Thread via GitHub


AndrewJSchofield commented on code in PR #14789:
URL: https://github.com/apache/kafka/pull/14789#discussion_r1398266495


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() {
 assertEquals(5, recordsToTest.get(1).offset());
 }
 
+/**
+ * KAFKA-15836:
+ * Test that max.poll.records is honoured when consuming from multiple 
topic-partitions and the
+ * fetched records are not aligned on max.poll.records boundaries.
+ *
+ * tp0 has records 1,2,3; tp1 has records 6,7,8
+ * max.poll.records is 2
+ * 
+ * poll 1 should return 1,2
+ * poll 2 should return 3,6
+ * poll 3 should return 7,8
+ * 
+ * Or similar :)
+ */
+@Test
+public void testFetchMaxPollRecordsUnaligned() {
+buildFetcher(2);
+
+Set tps = new HashSet<>();
+tps.add(tp0);
+tps.add(tp1);
+assignFromUser(tps);
+subscriptions.seek(tp0, 1);
+subscriptions.seek(tp1, 6);
+
+client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, 
moreRecords, 100L));
+client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, 
Errors.NONE, 100L, 0));
+
+assertEquals(1, sendFetches());
+consumerClient.poll(time.timer(0));

Review Comment:
   @jolshan Thanks for the review. I have improved the test comments and used a 
submethod to reduce repetition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-18 Thread via GitHub


agavra commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398269442


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.DslWindowParams;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+public class StreamJoinedStoreFactory extends 
AbstractConfigurableStoreFactory {
+
+private final String name;
+private final JoinWindows windows;
+private final Serde valueSerde;
+private final WindowBytesStoreSupplier storeSupplier;
+private final StreamJoinedInternal joinedInternal;
+
+private boolean loggingEnabled;
+private final Map logConfig;
+
+public enum Type {
+THIS,
+OTHER
+}
+
+public StreamJoinedStoreFactory(
+final String name,
+final JoinWindows windows,
+final StreamJoinedInternal joinedInternal,
+final Type type
+) {
+super(joinedInternal.dslStoreSuppliers());
+this.name = name + "-store";
+this.joinedInternal = joinedInternal;
+this.windows = windows;
+this.loggingEnabled = joinedInternal.loggingEnabled();
+this.logConfig = new HashMap<>(joinedInternal.logConfig());
+
+// since this store is configured to retain duplicates, we should
+// not compact, so we override the configuration to make sure that
+// it's just delete (window stores are configured to compact,delete)
+this.logConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);

Review Comment:
   There is a test for it (not added in this PR)! that's how it caught this bug 
;) I don't remember which file it's in though.. if I figure it out I'll link it 
here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-18 Thread via GitHub


agavra commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398269890


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java:
##
@@ -66,6 +66,10 @@ default void configure(final StreamsConfig config) {
 
 boolean isVersionedStore();
 
+// TODO: consider moving all the log configuration code 
(InternalTopicConfig)

Review Comment:
   > Just wondering, would we also be able to get rid of the #historyRetention 
and #isVersionedStore or is that used for something else?
   
   we would with a little bit more abstraction/refactoring. they are used 
somewhere to check configurations but I think it makes sense to push that into 
this class as well and get rid of that too.
   
   > Btw unless you plan on doing it yourself in an immediate followup PR
   
   I am happy to do that, I checked the code and found quite a few TODOs so I 
assumed it was OK. I'll just remove the TODO and file a JIRA.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-18 Thread via GitHub


agavra commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398270071


##
streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * {@code DslSessionParams} is a wrapper class for all parameters that function
+ * as inputs to {@link DslStoreSuppliers#sessionStore(DslSessionParams)}.
+ */
+public class DslSessionParams {
+
+private final String name;
+private final Duration retentionPeriod;
+private final EmitStrategy emitStrategy;
+
+/**
+ * @param name  name of the store (cannot be {@code null})
+ * @param retentionPeriod   length of time to retain data in the store 
(cannot be negative)
+ *  (note that the retention period must be at 
least as long enough to
+ *  contain the inactivity gap of the session and 
the entire grace period.)
+ * @param emitStrategy  defines how to emit results
+ */
+public DslSessionParams(
+final String name,
+final Duration retentionPeriod,
+final EmitStrategy emitStrategy
+) {
+Objects.requireNonNull(name);
+this.name = name;
+this.retentionPeriod = retentionPeriod;
+this.emitStrategy = emitStrategy;
+}
+
+public String name() {
+return name;
+}
+
+public Duration retentionPeriod() {
+return retentionPeriod;
+}
+
+public EmitStrategy emitStrategy() {
+return emitStrategy;
+}
+
+@Override
+public boolean equals(final Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+final DslSessionParams that = (DslSessionParams) o;
+return Objects.equals(name, that.name)
+&& Objects.equals(retentionPeriod, that.retentionPeriod)
+&& Objects.equals(emitStrategy, that.emitStrategy);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(name, retentionPeriod, emitStrategy);
+}
+
+@Override
+public String toString() {
+return "DslSessionParams{" +
+"name='" + name + '\'' +

Review Comment:
   It's autogenerated from intelliJ to indicate that `name` is a String 
(basically it formats it as "name='foo'". it isn't necessary (in fact it would 
be wrong) for non-string fields



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-18 Thread via GitHub


agavra commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398270264


##
streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * {@code DslWindowParams} is a wrapper class for all parameters that function
+ * as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}.
+ */
+public class DslWindowParams {
+
+private final String name;
+private final Duration retentionPeriod;
+private final Duration windowSize;
+private final boolean retainDuplicates;
+private final EmitStrategy emitStrategy;
+private final boolean isSlidingWindow;
+
+/**
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod  length of time to retain data in the store 
(cannot be negative)
+ * (note that the retention period must be at 
least long enough to contain the
+ * windowed data's entire life cycle, from 
window-start through window-end,
+ * and for the entire grace period)

Review Comment:
   this was copied from javadoc elsewhere - see `Stores#persistentWindowStore`. 
if we want to change it we can do that in a separate PR so it's consistent 
throughout the board



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15852) Move broker code from `core` to `broker` module

2023-11-18 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-15852:
---

 Summary: Move broker code from `core` to `broker` module
 Key: KAFKA-15852
 URL: https://issues.apache.org/jira/browse/KAFKA-15852
 Project: Kafka
  Issue Type: Bug
Reporter: Ismael Juma


The relevant packages would be `kafka.server`, `kafka.cluster`, etc.

See KAFKA-14524 for more context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2023-11-18 Thread via GitHub


ijuma commented on PR #14034:
URL: https://github.com/apache/kafka/pull/14034#issuecomment-1817663128

   @satishd Let me know when this is ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15852) Move broker code from `core` to `broker` module

2023-11-18 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-15852:

Issue Type: Improvement  (was: Bug)

> Move broker code from `core` to `broker` module
> ---
>
> Key: KAFKA-15852
> URL: https://issues.apache.org/jira/browse/KAFKA-15852
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> The relevant packages would be `kafka.server`, `kafka.cluster`, etc.
> See KAFKA-14524 for more context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15852) Move server code from `core` to `server` module

2023-11-18 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-15852:

Summary: Move server code from `core` to `server` module  (was: Move broker 
code from `core` to `server` module)

> Move server code from `core` to `server` module
> ---
>
> Key: KAFKA-15852
> URL: https://issues.apache.org/jira/browse/KAFKA-15852
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> The relevant packages would be `kafka.server`, `kafka.cluster`, etc.
> See KAFKA-14524 for more context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15852) Move broker code from `core` to `server` module

2023-11-18 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-15852:

Summary: Move broker code from `core` to `server` module  (was: Move broker 
code from `core` to `broker` module)

> Move broker code from `core` to `server` module
> ---
>
> Key: KAFKA-15852
> URL: https://issues.apache.org/jira/browse/KAFKA-15852
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> The relevant packages would be `kafka.server`, `kafka.cluster`, etc.
> See KAFKA-14524 for more context.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15853) Move KafkaConfig to server module

2023-11-18 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-15853:
---

 Summary: Move KafkaConfig to server module
 Key: KAFKA-15853
 URL: https://issues.apache.org/jira/browse/KAFKA-15853
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma


The server module is a Java-only module, so this also requires converting from 
Scala to Java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-18 Thread via GitHub


agavra commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r139828


##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1038,13 +1038,15 @@ public void 
shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
 assertThrows(ConfigException.class, () -> new StreamsConfig(props));
 }
 
+@SuppressWarnings("deprecation")
 @Test
 public void shouldSpecifyRocksdbWhenNotExplicitlyAddedToConfigs() {
 final String expectedDefaultStoreType = StreamsConfig.ROCKS_DB;
 final String actualDefaultStoreType = 
streamsConfig.getString(DEFAULT_DSL_STORE_CONFIG);
 assertEquals("default.dsl.store should be \"rocksDB\"", 
expectedDefaultStoreType, actualDefaultStoreType);
 }
 
+@SuppressWarnings("deprecation")
 @Test
 public void shouldSpecifyInMemoryWhenExplicitlyAddedToConfigs() {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15854) Move Java classes from kafka.server to the server module

2023-11-18 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-15854:
---

Assignee: Ismael Juma

> Move Java classes from kafka.server to the server module
> 
>
> Key: KAFKA-15854
> URL: https://issues.apache.org/jira/browse/KAFKA-15854
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15854) Move Java classes from kafka.server to the server module

2023-11-18 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-15854:
---

 Summary: Move Java classes from kafka.server to the server module
 Key: KAFKA-15854
 URL: https://issues.apache.org/jira/browse/KAFKA-15854
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-18 Thread via GitHub


ijuma opened a new pull request, #14796:
URL: https://github.com/apache/kafka/pull/14796

   We only move the classes that have no dependencies on Scala classes.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398291786


##
build.gradle:
##
@@ -1643,19 +1700,6 @@ project(':server-common') {
 }
   }
 
-  sourceSets {
-main {
-  java {
-srcDirs = ["src/main/java"]
-  }
-}
-test {
-  java {
-srcDirs = ["src/test/java"]
-  }
-}
-  }

Review Comment:
   Redundant and unnecessary.



##
build.gradle:
##
@@ -1643,19 +1700,6 @@ project(':server-common') {
 }
   }
 
-  sourceSets {
-main {
-  java {
-srcDirs = ["src/main/java"]
-  }
-}
-test {
-  java {
-srcDirs = ["src/test/java"]
-  }
-}
-  }

Review Comment:
   Redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398292085


##
build.gradle:
##
@@ -1670,6 +1714,10 @@ project(':server-common') {
   checkstyle {
 configProperties = 
checkstyleConfigProperties("import-control-server-common.xml")
   }
+
+  javadoc {
+enabled = false

Review Comment:
   We don't have public classes in `server-common`, so make it explicit that 
javadoc should not be published.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398292530


##
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java:
##
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.metrics;
+package org.apache.kafka.server.metrics;

Review Comment:
   @junrao @apoorvmittal10 Note that I have moved these recently introduced 
classes to this new module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14524) Modularize `core` monolith

2023-11-18 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14524:

Description: 
The `core` module has grown too large and it's time to split it into multiple 
modules.  The `core` module will be deleted once it's empty.

Evidence of `core` growing too large is that it takes 1m10s to compile the main 
code and tests and it takes hours to run all the tests sequentially.

As part of this effort, we should rewrite the Scala code in Java to reduce 
developer friction, reduce compilation time and simplify deployment (i.e. we 
can remove the scala version suffix from the module name). Scala may have a 
number of advantages over Java 8 (minimum version we support now) and Java 11 
(minimum version we will support in Kafka 4.0), but a mixture of Scala and Java 
(as we have now) is more complex than just Java.

Another benefit is that code dependencies will be strictly enforced, which will 
hopefully help ensure better abstractions.

This pattern was started with the `tools` (but not completed), `metadata` and 
`raft` modules and we have (when this ticket was filed) a couple more in 
progress: `group-coordinator` and `storage`.

This is an umbrella ticket and it will link to each ticket related to this goal.

  was:
The `core` module has grown too large and it's time to split it into multiple 
modules. A much slimmer `core` module will remain in the end.

Evidence of `core` growing too large is that it takes 1m10s to compile the main 
code and tests and it takes hours to run all the tests sequentially.

As part of this effort, we should rewrite the Scala code in Java to reduce 
developer friction, reduce compilation time and simplify deployment (i.e. we 
can remove the scala version suffix from the module name). Scala may have a 
number of advantages over Java 8 (minimum version we support now) and Java 11 
(minimum version we will support in Kafka 4.0), but a mixture of Scala and Java 
(as we have now) is more complex than just Java.

Another benefit is that code dependencies will be strictly enforced, which will 
hopefully help ensure better abstractions.

This pattern was started with the `tools` (but not completed), `metadata` and 
`raft` modules and we have (when this ticket was filed) a couple more in 
progress: `group-coordinator` and `storage`.

This is an umbrella ticket and it will link to each ticket related to this goal.


> Modularize `core` monolith
> --
>
> Key: KAFKA-14524
> URL: https://issues.apache.org/jira/browse/KAFKA-14524
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> The `core` module has grown too large and it's time to split it into multiple 
> modules.  The `core` module will be deleted once it's empty.
> Evidence of `core` growing too large is that it takes 1m10s to compile the 
> main code and tests and it takes hours to run all the tests sequentially.
> As part of this effort, we should rewrite the Scala code in Java to reduce 
> developer friction, reduce compilation time and simplify deployment (i.e. we 
> can remove the scala version suffix from the module name). Scala may have a 
> number of advantages over Java 8 (minimum version we support now) and Java 11 
> (minimum version we will support in Kafka 4.0), but a mixture of Scala and 
> Java (as we have now) is more complex than just Java.
> Another benefit is that code dependencies will be strictly enforced, which 
> will hopefully help ensure better abstractions.
> This pattern was started with the `tools` (but not completed), `metadata` and 
> `raft` modules and we have (when this ticket was filed) a couple more in 
> progress: `group-coordinator` and `storage`.
> This is an umbrella ticket and it will link to each ticket related to this 
> goal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15326) Decouple Processing Thread from Polling Thread

2023-11-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787544#comment-17787544
 ] 

Guozhang Wang commented on KAFKA-15326:
---

My bad not linking the first couple of PRs with this ticket --- was thinking to 
only rename them after I got someone to review, but in the end until it's 
merged I did not rename it.

> Decouple Processing Thread from Polling Thread
> --
>
> Key: KAFKA-15326
> URL: https://issues.apache.org/jira/browse/KAFKA-15326
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Critical
>
> As part of an ongoing effort to implement a better threading architecture in 
> Kafka streams, we decouple N stream threads into N polling threads and N 
> processing threads. The effort to consolidate N polling thread into a single 
> thread is follow-up after this ticket. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-18 Thread via GitHub


guozhangwang commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1817710512

   Also made a very quick pass, and I think the fix is spot on. It would be 
great to get this merged sooner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Do not check whether updating tasks exist in the waiting loop [kafka]

2023-11-18 Thread via GitHub


guozhangwang commented on PR #14791:
URL: https://github.com/apache/kafka/pull/14791#issuecomment-1817710719

   LGTM!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-18 Thread via GitHub


guozhangwang commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398306353


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import 
org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
+import org.apache.kafka.streams.state.internals.ListValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
+
+public class OuterStreamJoinStoreFactory extends 
AbstractConfigurableStoreFactory {
+
+private final String name;
+private final StreamJoinedInternal streamJoined;
+private final JoinWindows windows;
+private boolean loggingEnabled;
+
+public enum Type {
+RIGHT,
+LEFT
+}
+
+public OuterStreamJoinStoreFactory(
+final String name,
+final StreamJoinedInternal streamJoined,
+final JoinWindows windows,
+final Type type
+) {
+super(streamJoined.dslStoreSuppliers());
+this.name = buildOuterJoinWindowStoreName(streamJoined, name, type) + 
"-store";
+this.streamJoined = streamJoined;
+this.windows = windows;
+this.loggingEnabled = streamJoined.loggingEnabled();
+}
+
+@Override
+public StateStore build() {
+final Duration retentionPeriod = Duration.ofMillis(retentionPeriod());
+final Duration windowSize = Duration.ofMillis(windows.size());
+final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+if (retentionMs < 0L) {
+throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+}
+if (windowSizeMs < 0L) {
+throw new IllegalArgumentException("windowSize cannot be 
negative");
+}
+if (windowSizeMs > retentionMs) {
+throw new IllegalArgumentException("The retention period of the 
window store "
++ name + " must be no smaller than its window size. Got 
size=["
++ windowSizeMs + "], retention=[" + retentionMs + "]");
+}
+
+final TimestampedKeyAndJoinSideSerde timestampedKeyAndJoinSideSerde 
= new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
+final LeftOrRightValueSerde leftOrRightValueSerde = new 
LeftOrRightValueSerde<>(streamJoined.valueSerde(), 
streamJoined.otherValueSerde());
+
+// TODO: we should allow for configuration of this store explicitly 
instead of assuming that it should
+// share the same type of store as thisStoreSupplier
+final boolean useInMemoryStore = streamJoined.thisStoreSupplier() != 
null
+&& streamJoined.thisStoreSupplier() instanceof 
InMemoryWindowBytesStoreSupplier;
+final KeyValueBytesStoreSupplier supplier = useInMe

Re: [PR] KAFKA-14957: Update-Description-String [kafka]

2023-11-18 Thread via GitHub


Owen-CH-Leung commented on PR #13909:
URL: https://github.com/apache/kafka/pull/13909#issuecomment-1817731594

   Hi @mjsax , can I get some feedback from you ? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]

2023-11-18 Thread via GitHub


ijuma commented on PR #14369:
URL: https://github.com/apache/kafka/pull/14369#issuecomment-1817732550

   @cmccabe Please check all the Java/Scala versions before merging, this PR 
clearly broke the Java 8/Scala 2.12 build in multiple ways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14585: 1st part : Java versions for metadata/broker and updated LogConfig [kafka]

2023-11-18 Thread via GitHub


github-actions[bot] commented on PR #14106:
URL: https://github.com/apache/kafka/pull/14106#issuecomment-1817733757

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14794:
URL: https://github.com/apache/kafka/pull/14794#discussion_r1398317507


##
core/src/test/java/kafka/server/AssignmentsManagerTest.java:
##
@@ -72,6 +75,29 @@ void tearDown() throws InterruptedException {
 manager.close();
 }
 
+AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData 
request) {

Review Comment:
   @soarez Why are we creating a copy instead of simply sorting in place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1398324375


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -1048,20 +1054,15 @@ class KafkaServer(
*
* @return The brokerId.
*/
-  private def getOrGenerateBrokerId(brokerMetadata: RawMetaProperties): Int = {
-val brokerId = config.brokerId
-
-if (brokerId >= 0 && brokerMetadata.brokerId.exists(_ != brokerId))
-  throw new InconsistentBrokerIdException(

Review Comment:
   @cmccabe There is a test that's meant to catch this exception, but it was 
buggy: 
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala#L137
   
   What's the expected behavior now and should the code or test be changed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Delete unused classes [kafka]

2023-11-18 Thread via GitHub


ijuma opened a new pull request, #14797:
URL: https://github.com/apache/kafka/pull/14797

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Delete unused classes [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14797:
URL: https://github.com/apache/kafka/pull/14797#discussion_r1398326335


##
core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala:
##
@@ -131,11 +131,7 @@ class ServerGenerateBrokerIdTest extends QuorumTestHarness 
{
 servers = Seq(server1)
 server1.shutdown()
 server1 = new KafkaServer(config2, threadNamePrefix = 
Option(this.getClass.getName)) // user specified broker id
-try {
-  server1.startup()
-} catch {
-  case _: kafka.common.InconsistentBrokerIdException => //success

Review Comment:
   Asked question here 
https://github.com/apache/kafka/pull/14628#discussion_r1398324375



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10873) Inaccurate "Ignoring stop request for unowned connector" log messages

2023-11-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787561#comment-17787561
 ] 

Sagar Rao commented on KAFKA-10873:
---

FWIW, we hit a scenario today where the same sequence of logs keep getting 
emitted.

> Inaccurate "Ignoring stop request for unowned connector" log messages
> -
>
> Key: KAFKA-10873
> URL: https://issues.apache.org/jira/browse/KAFKA-10873
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> If a connector fails during startup, it will never be added to the worker's 
> internal list of running connectors (see 
> [https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L298|https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L298).]),
>  and the same happens for tasks (see 
> [https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L570]).
>  
> This leads to the following {{WARN}}-level log messages when that 
> connector/task is scheduled to be stopped by the worker:
>  * [Ignoring stop request for unowned connector 
> |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L390]
>  * [Ignoring await stop request for non-present connector 
> |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L415]
>  * [Ignoring stop request for unowned task 
> |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L832]
>  * [Ignoring await stop request for non-present task 
> |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L862]
> If the connector/task failed on startup, there should already be log messages 
> detailing the failure and its cause; there is no need to emit warning 
> messages about not stopping that connector when it is scheduled for shutdown. 
> Even worse, emitting these messages may cause users to believe that their 
> cluster is experiencing a rebalancing bug that is somehow causing 
> connectors/tasks that are not assigned to a worker to be revoked from it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-18 Thread via GitHub


divijvaidya commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398332350


##
build.gradle:
##
@@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: 
connectPkgs.collect { it + ":jar" })
 
 tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + 
":test" }) {}
 
+project(':server') {
+  archivesBaseName = "kafka-server"
+
+  dependencies {
+implementation project(':clients')
+implementation project(':server-common')
+
+implementation libs.slf4jApi
+
+compileOnly libs.log4j
+
+testImplementation project(':clients').sourceSets.test.output
+
+testImplementation libs.mockitoCore
+testImplementation libs.junitJupiter
+testImplementation libs.slf4jlog4j
+  }
+
+  task createVersionFile() {

Review Comment:
   this is duplicated across all projects. Could we extract it out and use the 
same task across all sub-projects?



##
build.gradle:
##
@@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: 
connectPkgs.collect { it + ":jar" })
 
 tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + 
":test" }) {}
 
+project(':server') {
+  archivesBaseName = "kafka-server"
+
+  dependencies {
+implementation project(':clients')
+implementation project(':server-common')
+
+implementation libs.slf4jApi
+
+compileOnly libs.log4j
+
+testImplementation project(':clients').sourceSets.test.output
+
+testImplementation libs.mockitoCore
+testImplementation libs.junitJupiter
+testImplementation libs.slf4jlog4j
+  }
+
+  task createVersionFile() {
+def receiptFile = file("$buildDir/kafka/$buildVersionFileName")

Review Comment:
   am not a gradle expert but my IDE is telling me that $buildDir is deprecated 
and replaced by `getLayout().getBuildDirectory()` instead



##
build.gradle:
##
@@ -1643,19 +1700,6 @@ project(':server-common') {
 }
   }
 
-  sourceSets {
-main {
-  java {
-srcDirs = ["src/main/java"]
-  }
-}
-test {
-  java {
-srcDirs = ["src/test/java"]
-  }
-}
-  }

Review Comment:
   Should we create a ticket to get rid of this from all sub-projects? If you 
like, we can do it as part of this PR itself.



##
checkstyle/import-control-storage.xml:
##
@@ -27,15 +27,11 @@
 
 
 
-
 
 
-
-
 
 
 
-

Review Comment:
   thank you for cleaning the import-control files!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398336088


##
build.gradle:
##
@@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: 
connectPkgs.collect { it + ":jar" })
 
 tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + 
":test" }) {}
 
+project(':server') {
+  archivesBaseName = "kafka-server"
+
+  dependencies {
+implementation project(':clients')
+implementation project(':server-common')
+
+implementation libs.slf4jApi
+
+compileOnly libs.log4j
+
+testImplementation project(':clients').sourceSets.test.output
+
+testImplementation libs.mockitoCore
+testImplementation libs.junitJupiter
+testImplementation libs.slf4jlog4j
+  }
+
+  task createVersionFile() {
+def receiptFile = file("$buildDir/kafka/$buildVersionFileName")

Review Comment:
   I used the same pattern used by other modules. We can replace all of them 
via a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398336156


##
build.gradle:
##
@@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: 
connectPkgs.collect { it + ":jar" })
 
 tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + 
":test" }) {}
 
+project(':server') {
+  archivesBaseName = "kafka-server"
+
+  dependencies {
+implementation project(':clients')
+implementation project(':server-common')
+
+implementation libs.slf4jApi
+
+compileOnly libs.log4j
+
+testImplementation project(':clients').sourceSets.test.output
+
+testImplementation libs.mockitoCore
+testImplementation libs.junitJupiter
+testImplementation libs.slf4jlog4j
+  }
+
+  task createVersionFile() {

Review Comment:
   I'm not sure. Best done via its own PR as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-18 Thread via GitHub


ijuma commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398336216


##
build.gradle:
##
@@ -1643,19 +1700,6 @@ project(':server-common') {
 }
   }
 
-  sourceSets {
-main {
-  java {
-srcDirs = ["src/main/java"]
-  }
-}
-test {
-  java {
-srcDirs = ["src/test/java"]
-  }
-}
-  }

Review Comment:
   I can file a ticket. Some of them are actually necessary as they add 
additional directories.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Upgrade Zstd-jni to 1.5.5-9 [kafka]

2023-11-18 Thread via GitHub


divijvaidya opened a new pull request, #14798:
URL: https://github.com/apache/kafka/pull/14798

   ## Major diffs between versions
   1. Upgrade minimum JDK from 6 to 8
   2. Dependency (sbt) upgrade to work with JDK 17
   3. Add new platforms such as win/aarch64
   
   None of the changes are especially relevant to Kakfa.
   
   Diff between 1.5.5-6 to 1.5.5-9: 
https://github.com/luben/zstd-jni/compare/v1.5.5-6...v1.5.5-9


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org