[GitHub] [kafka] guozhangwang commented on a diff in pull request #12739: Replace EasyMock and PowerMock with Mockito | TimeOrderedCachingPersistentWindowStoreTest

2022-10-12 Thread GitBox


guozhangwang commented on code in PR #12739:
URL: https://github.com/apache/kafka/pull/12739#discussion_r994141999


##
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java:
##
@@ -138,51 +144,69 @@ public void setUp() {
 
 @After
 public void closeStore() {
-cachingStore.close();
+try {
+cachingStore.close();
+} catch (final RuntimeException runtimeException) {
+   /*
+   It will reach here for the testcases like
+   shouldCloseCacheAndWrappedStoreAfterErrorDuringCacheFlush(),
+   shouldCloseWrappedStoreAfterErrorDuringCacheClose(),
+   shouldCloseCacheAfterErrorDuringStateStoreClose()
+
+   In these testcases we have set the doThrow for the cache#close
+   cache#flush, underlyingStore#close method.
+*/
+}
+
 }
 
 @SuppressWarnings("deprecation")
 @Test
 public void shouldDelegateDeprecatedInit() {
-final RocksDBTimeOrderedWindowStore inner = 
EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
-EasyMock.expect(inner.hasIndex()).andReturn(hasIndex);
-EasyMock.replay(inner);
-final TimeOrderedCachingWindowStore outer = new 
TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
+final RocksDBTimeOrderedWindowStore inner = 
mock(RocksDBTimeOrderedWindowStore.class);
+when(inner.hasIndex()).thenReturn(hasIndex);
+
+final TimeOrderedCachingWindowStore outer =

Review Comment:
   I think we do not need to spy on the outer, since here we are only checking 
that when `outer.init` is called, its `inner.init` is called too. Ditto for 
other tests



##
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java:
##
@@ -124,7 +129,8 @@ public static Collection data() {
 public void setUp() {
 baseKeySchema = new TimeFirstWindowKeySchema();
 bytesStore = new RocksDBTimeOrderedWindowSegmentedBytesStore("test", 
"metrics-scope", 100, SEGMENT_INTERVAL, hasIndex);
-underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false, 
WINDOW_SIZE);
+underlyingStore = spy(new RocksDBTimeOrderedWindowStore(bytesStore,

Review Comment:
   Is this intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14133) Remaining EasyMock to Mockito tests

2022-10-12 Thread Shekhar Prasad Rajak (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616757#comment-17616757
 ] 

Shekhar Prasad Rajak commented on KAFKA-14133:
--

PR #12739 for TimeOrderedCachingPersistentWindowStoreTest

> Remaining EasyMock to Mockito tests
> ---
>
> Key: KAFKA-14133
> URL: https://issues.apache.org/jira/browse/KAFKA-14133
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
> put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here 
> rely solely on EasyMock.{color}
> Unless stated in brackets the tests are in the streams module.
> A list of tests which still require to be moved from EasyMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}In Review{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
>  # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
>  # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
>  # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
>  # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
>  # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
>  # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
>  # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
>  # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
> Christo)
>  # {color:#ff8b00}StreamsRebalanceListenerTest{color} (owner: Christo)
>  # {color:#ff8b00}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
> Christo)
>  # {color:#ff8b00}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}CachingInMemorySessionStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}CachingPersistentSessionStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}CachingPersistentWindowStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} 
> (owner: Christo)
>  # {color:#ff8b00}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}KeyValueStoreBuilderTest{color} (owner: Christo)
>  # {color:#ff8b00}RocksDBStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}StreamThreadStateStoreProviderTest{color} (owner: Christo)
>  # {color:#ff8b00}TopologyTest{color} (owner: Christo)
>  # {color:#ff8b00}KTableSuppressProcessorTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
> Christo)
>  # {color:#ff8b00}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
>  # InternalTopicManagerTest (owner: Christo)
>  # ProcessorContextImplTest (owner: Christo)
>  # WriteConsistencyVectorTest (owner: Christo)
>  # StreamsAssignmentScaleTest (owner: Christo)
>  # StreamsPartitionAssignorTest (owner: Christo)
>  # AssignmentTestUtils (owner: Christo)
>  # ProcessorStateManagerTest ({*}WIP{*} owner: Matthew)
>  # StandbyTaskTest ({*}WIP{*} owner: Matthew)
>  # StoreChangelogReaderTest ({*}WIP{*} owner: Matthew)
>  # StreamTaskTest ({*}WIP{*} owner: Matthew)
>  # StreamThreadTest ({*}WIP{*} owner: Matthew)
>  # StreamsMetricsImplTest
>  # TimeOrderedCachingPersistentWindowStoreTest
>  # TimeOrderedWindowStoreTest
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] shekhar-rajak opened a new pull request, #12739: Replace EasyMock and PowerMock with Mockito | TimeOrderedCachingPersistentWindowStoreTest

2022-10-12 Thread GitBox


shekhar-rajak opened a new pull request, #12739:
URL: https://github.com/apache/kafka/pull/12739

   Related to KAFKA-14059 and KAFKA-14132
   
   Link: 
   
   * https://issues.apache.org/jira/browse/KAFKA-14133
   * https://issues.apache.org/jira/browse/KAFKA-14132
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-10-12 Thread GitBox


showuon commented on PR #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-1276917798

   Thanks for the patience, @vamossagar12 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-10-12 Thread GitBox


showuon merged PR #11211:
URL: https://github.com/apache/kafka/pull/11211


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-10-12 Thread GitBox


showuon commented on PR #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-1276916375

   Failed tests are unrelated.
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12738: minor: Trying to fix some flakiness in KRaftClusterTest::testDescribeQuorumRequestToBrokers()

2022-10-12 Thread GitBox


hachikuji commented on code in PR #12738:
URL: https://github.com/apache/kafka/pull/12738#discussion_r994016187


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -811,20 +811,29 @@ class KRaftClusterTest {
 
assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
   s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
 
-quorumInfo.voters.forEach { voter =>
-  assertTrue(0 < voter.logEndOffset,
-s"logEndOffset for voter with ID ${voter.replicaId} was 
${voter.logEndOffset}")
-  assertNotEquals(OptionalLong.empty(), voter.lastFetchTimestamp)
-  assertNotEquals(OptionalLong.empty(), voter.lastCaughtUpTimestamp)
+val(voters, voterResponseValid) = TestUtils.computeUntilTrue(
+  admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+.quorumInfo().get().voters()){
+  voters => voters.stream
+.allMatch(voter => (voter.logEndOffset > 0
+  && voter.lastFetchTimestamp() != OptionalLong.empty()
+  && voter.lastCaughtUpTimestamp() != OptionalLong.empty()))
 }
 
-assertEquals(cluster.brokers.asScala.keySet, 
quorumInfo.observers.asScala.map(_.replicaId).toSet)
-quorumInfo.observers.forEach { observer =>
-  assertTrue(0 < observer.logEndOffset,
-s"logEndOffset for observer with ID ${observer.replicaId} was 
${observer.logEndOffset}")
-  assertNotEquals(OptionalLong.empty(), observer.lastFetchTimestamp)
-  assertNotEquals(OptionalLong.empty(), observer.lastCaughtUpTimestamp)
+assertTrue(voterResponseValid, s"Atleast one voter did not return the 
expected state within timeout." +
+  s"The responses gathered for all the voters: ${voters.toString}")
+
+val(observers, observerResponseValid) = TestUtils.computeUntilTrue(
+  admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+.quorumInfo().get().observers()){
+  observers => observers.stream.allMatch(observer => 
(observer.logEndOffset > 0

Review Comment:
   Hmm, do these checks achieve anything? In what case would the leader report 
an observer which doesn't have these fields set? I'm considering alternatively 
if the condition we should be waiting for here is this one:
   ```scala
   assertEquals(cluster.brokers.asScala.keySet, 
observers.asScala.map(_.replicaId).toSet)
   ```
   Once we are sure we have responses from all expected observers, then we can 
assert the additional fields. Does that make sense?



##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -811,20 +811,29 @@ class KRaftClusterTest {
 
assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
   s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
 
-quorumInfo.voters.forEach { voter =>
-  assertTrue(0 < voter.logEndOffset,
-s"logEndOffset for voter with ID ${voter.replicaId} was 
${voter.logEndOffset}")
-  assertNotEquals(OptionalLong.empty(), voter.lastFetchTimestamp)
-  assertNotEquals(OptionalLong.empty(), voter.lastCaughtUpTimestamp)
+val(voters, voterResponseValid) = TestUtils.computeUntilTrue(

Review Comment:
   nit: space after `val`



##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -811,20 +811,29 @@ class KRaftClusterTest {
 
assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
   s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
 
-quorumInfo.voters.forEach { voter =>
-  assertTrue(0 < voter.logEndOffset,
-s"logEndOffset for voter with ID ${voter.replicaId} was 
${voter.logEndOffset}")
-  assertNotEquals(OptionalLong.empty(), voter.lastFetchTimestamp)
-  assertNotEquals(OptionalLong.empty(), voter.lastCaughtUpTimestamp)
+val(voters, voterResponseValid) = TestUtils.computeUntilTrue(
+  admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+.quorumInfo().get().voters()){
+  voters => voters.stream
+.allMatch(voter => (voter.logEndOffset > 0
+  && voter.lastFetchTimestamp() != OptionalLong.empty()
+  && voter.lastCaughtUpTimestamp() != OptionalLong.empty()))
 }
 
-assertEquals(cluster.brokers.asScala.keySet, 
quorumInfo.observers.asScala.map(_.replicaId).toSet)
-quorumInfo.observers.forEach { observer =>
-  assertTrue(0 < observer.logEndOffset,
-s"logEndOffset for observer with ID ${observer.replicaId} was 
${observer.logEndOffset}")
-  assertNotEquals(OptionalLong.empty(), observer.lastFetchTimestamp)
-  assertNotEquals(OptionalLong.empty(), observer.lastCaughtUpTimestamp)
+assertTrue(voterResponseValid, s"Atleast one voter did not r

[GitHub] [kafka] divijvaidya commented on a diff in pull request #12735: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest | KAFKA-14059

2022-10-12 Thread GitBox


divijvaidya commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r994015093


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -374,30 +344,37 @@ public void testErrorHandlingInSourceTasks() throws 
Exception {
 Struct struct2 = new Struct(valSchema).put("val", 6789);
 SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+when(workerSourceTask.isStopping()).thenReturn(false);
+when(workerSourceTask.isStopping()).thenReturn(false);
+when(workerSourceTask.isStopping()).thenReturn(false);
 
-EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+doReturn(true).when(workerSourceTask).commitOffsets();
 
-offsetStore.start();
-EasyMock.expectLastCall();
-sourceTask.initialize(EasyMock.anyObject());
-EasyMock.expectLastCall();
-sourceTask.start(EasyMock.anyObject());
-EasyMock.expectLastCall();
+when(sourceTask.poll()).thenReturn(singletonList(record1));
+when(sourceTask.poll()).thenReturn(singletonList(record2));
 
-EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1));
-EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2));
 expectTopicCreation(TOPIC);
-EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject())).andReturn(null).times(2);

Review Comment:
   ```
   when(producer.send(any(), any()))
   .thenReturn(null)
   .thenReturn(null);
   ```
   
   is missing here. 



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -374,30 +344,37 @@ public void testErrorHandlingInSourceTasks() throws 
Exception {
 Struct struct2 = new Struct(valSchema).put("val", 6789);
 SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+when(workerSourceTask.isStopping()).thenReturn(false);
+when(workerSourceTask.isStopping()).thenReturn(false);
+when(workerSourceTask.isStopping()).thenReturn(false);
 
-EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
+doReturn(true).when(workerSourceTask).commitOffsets();
 
-offsetStore.start();
-EasyMock.expectLastCall();
-sourceTask.initialize(EasyMock.anyObject());
-EasyMock.expectLastCall();
-sourceTask.start(EasyMock.anyObject());
-EasyMock.expectLastCall();
+when(sourceTask.poll()).thenReturn(singletonList(record1));

Review Comment:
   Please modify to 
`when(sourceTask.poll()).thenReturn(singletonList(record1)).thenReturn(singletonList(record2));`



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -571,13 +511,15 @@ private void createSinkTask(TargetState initialState, 
RetryWithToleranceOperator
 oo.put("schemas.enable", "false");
 converter.configure(oo);
 
-TransformationChain sinkTransforms = new 
TransformationChain<>(singletonList(new FaultyPassthrough()), 
retryWithToleranceOperator);
+TransformationChain sinkTransforms =
+new TransformationChain<>(singletonList(new 
FaultyPassthrough()), retryWithToleranceOperator);
 
-workerSinkTask = new WorkerSinkTask(
+workerSinkTask = spy(new WorkerSinkTask(

Review Comment:
   Please help me understand the reason on why are we spying this class. If not 
required, perhaps we can remove this? (Note that Spy adds a lot of latency to 
test execution and hence might be the reason for slow execution as you pointed 
out)



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -438,30 +414,28 @@ public void 
testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
 Struct struct2 = new Struct(valSchema).put("val", 6789);
 SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, 
PARTITION1, valSchema, struct2);
 
-EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-EasyMock.expect(workerSourceTask.isStopping()).andReturn(false);
-EasyMock.expect(workerSourceTask.isStopping()).andReturn(true);
+when(workerSourceTask.isStopping())
+.thenReturn(false)
+.thenReturn(false)

[GitHub] [kafka] alexec closed pull request #12699: KIP-873: Add PipeDeserializer/PipeSerialize

2022-10-12 Thread GitBox


alexec closed pull request #12699: KIP-873: Add PipeDeserializer/PipeSerialize
URL: https://github.com/apache/kafka/pull/12699


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] alexec closed pull request #12698: KIP-873: Add ExceptionHandlingDeserializer

2022-10-12 Thread GitBox


alexec closed pull request #12698: KIP-873: Add ExceptionHandlingDeserializer
URL: https://github.com/apache/kafka/pull/12698


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12736: KAFKA-14292 controlledShutDownOffset should only be updated if set to -1

2022-10-12 Thread GitBox


hachikuji commented on code in PR #12736:
URL: https://github.com/apache/kafka/pull/12736#discussion_r993999005


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -400,12 +412,13 @@ long lowestActiveOffset() {
 }
 
 /**
- * Mark a broker as being in the controlled shutdown state.
+ * Mark a broker as being in the controlled shutdown state. We only update 
the
+ * controlledShutdownOffset if the broker was previously not in controlled 
shutdown state.
  *
  * @param brokerId  The broker id.
  * @param controlledShutDownOffset  The offset at which controlled 
shutdown will be complete.
  */
-void updateControlledShutdownOffset(int brokerId, long 
controlledShutDownOffset) {
+void maybeControlledShutdownOffset(int brokerId, long 
controlledShutDownOffset) {

Review Comment:
   nit: maybeUpdateControlledShutdownOffset



##
metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java:
##
@@ -203,17 +204,36 @@ public void testUsableBrokerIterator() {
 expected.add(new UsableBroker(3, Optional.of("rack2"), false));
 expected.add(new UsableBroker(4, Optional.of("rack1"), true));
 assertEquals(expected, usableBrokersToSet(manager));
-manager.updateControlledShutdownOffset(2, 0);
+manager.maybeControlledShutdownOffset(2, 0);
 assertEquals(100L, manager.lowestActiveOffset());
 assertThrows(RuntimeException.class,
-() -> manager.updateControlledShutdownOffset(4, 0));
+() -> manager.maybeControlledShutdownOffset(4, 0));
 manager.touch(4, false, 100);
-manager.updateControlledShutdownOffset(4, 0);
+manager.maybeControlledShutdownOffset(4, 0);
 expected.remove(new UsableBroker(2, Optional.of("rack1"), false));
 expected.remove(new UsableBroker(4, Optional.of("rack1"), true));
 assertEquals(expected, usableBrokersToSet(manager));
 }
 
+@Test
+public void testControlledShutdownOffsetIsOnlyUpdatedOnce() {
+BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
+assertEquals(Collections.emptySet(), usableBrokersToSet(manager));
+manager.touch(0, false, 100);
+manager.touch(1, false, 100);
+manager.touch(2, false, 98);
+manager.touch(3, false, 100);
+manager.touch(4, true, 100);
+manager.maybeControlledShutdownOffset(2, 98);

Review Comment:
   Just before this, can we assert the shutdown offset is empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #12737: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor

2022-10-12 Thread GitBox


guozhangwang merged PR #12737:
URL: https://github.com/apache/kafka/pull/12737


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12737: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor

2022-10-12 Thread GitBox


guozhangwang commented on PR #12737:
URL: https://github.com/apache/kafka/pull/12737#issuecomment-1276844346

   Thanks for the reviews @vvcephei @ableegoldman , I will try to improve the 
javadocs in the next PR while merging this one as-is.
   
   Also none of the interface functions here are officially finalized, since as 
we continue on this project we may find some of the interfaces need 
modification as well. I just want to lay some basic foundations on the thread 
interactions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12736: KAFKA-14292 controlledShutDownOffset should only be updated if set to -1

2022-10-12 Thread GitBox


hachikuji commented on code in PR #12736:
URL: https://github.com/apache/kafka/pull/12736#discussion_r993980280


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -276,7 +277,7 @@ Collection brokers() {
 }
 
 // VisibleForTesting
-long getControlledShutDownOffset(int brokerId) {
+long controlledShutDownOffset(int brokerId) {

Review Comment:
   Is your concern that `Optional.empty` would be used to handle two 
situations? Perhaps we could raise an exception if the `brokerId` is not 
present in `brokers`? Then `Optional.empty` would be reserved only for the case 
when the broker is not in controlled shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12737: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor

2022-10-12 Thread GitBox


guozhangwang commented on code in PR #12737:
URL: https://github.com/apache/kafka/pull/12737#discussion_r993980012


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import java.util.Set;
+
+public interface TaskManager {
+
+/**
+ * Get the next processible active task for the requested executor. Once 
the task is assigned to
+ * the requested task executor, it should not be assigned to any other 
executors until it was
+ * returned to the task manager.
+ *
+ * @param executor the requesting {@link TaskExecutor}
+ */
+StreamTask assignNextTask(final TaskExecutor executor);
+
+/**
+ * Unassign the stream task so that it can be assigned to other executors 
later
+ * or be removed from the task manager. The requested executor must have 
locked
+ * the task already, otherwise an exception would be thrown.
+ *
+ * @param executor the requesting {@link TaskExecutor}
+ */
+void unassignTask(final StreamTask task, final TaskExecutor executor);
+
+/**
+ * Lock a set of active tasks from the task manager so that they will not 
be assigned to
+ * any {@link TaskExecutor}s anymore until they are unlocked. At the time 
this function
+ * is called, the requested tasks may already be locked by some {@link 
TaskExecutor}s,
+ * and in that case the task manager need to first unassign these tasks 
from the
+ * executors.
+ *
+ * This function is needed when we need to 1) commit these tasks, 2) 
remove these tasks.
+ *
+ * This method does not block, instead a future is returned.
+ */
+KafkaFuture lockTasks(final Set taskIds);
+
+/**
+ * Lock all of the managed active tasks from the task manager. Similar to 
{@link #lockTasks(Set)}.
+ *
+ * This method does not block, instead a future is returned.
+ */
+KafkaFuture lockAllTasks();
+
+/**
+ * Unlock the tasks so that they can be assigned to executors
+ */
+void unlockTasks(final Set taskIds);
+
+/**
+ * Unlock all of the managed active tasks from the task manager. Similar 
to {@link #unlockTasks(Set)}.
+ *
+ * This method does not block, instead a future is returned.
+ */
+void unlockAllTasks();
+
+/**
+ * Add a new active task to the task manager.
+ *
+ * @param tasks task to add
+ */
+void add(final Set tasks);
+
+/**
+ * Remove an active task from the task manager.
+ *
+ * The task to remove must be locked.
+ *
+ * @param taskId ID of the task to remove
+ */
+void remove(final TaskId taskId);

Review Comment:
   Yeah it's just related to how I implemented the `onAssignment` logic today, 
and it's open for changes in the future :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12737: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor

2022-10-12 Thread GitBox


guozhangwang commented on code in PR #12737:
URL: https://github.com/apache/kafka/pull/12737#discussion_r993979720


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import java.util.Set;
+
+public interface TaskManager {
+
+/**
+ * Get the next processible active task for the requested executor. Once 
the task is assigned to
+ * the requested task executor, it should not be assigned to any other 
executors until it was
+ * returned to the task manager.
+ *
+ * @param executor the requesting {@link TaskExecutor}
+ */
+StreamTask assignNextTask(final TaskExecutor executor);

Review Comment:
   The tasks set manipulations i.e. adding/removing, and also committing is 
done by the polling thread. The task processing itself, including grabbing the 
task from the task manager is done by the processing thread.
   
   Good question about when all tasks are locked. In this case we would not 
fail but just return null -- I will clarify that in the doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12737: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor

2022-10-12 Thread GitBox


guozhangwang commented on code in PR #12737:
URL: https://github.com/apache/kafka/pull/12737#discussion_r993978872


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+
+import java.time.Duration;
+
+public interface TaskExecutor {
+
+/**
+ * Starts the task processor.
+ */
+void start();
+
+/**
+ * Shuts down the task processor updater.
+ *
+ * @param timeout duration how long to wait until the state updater is 
shut down
+ *
+ * @throws
+ * org.apache.kafka.streams.errors.StreamsException if the state 
updater thread cannot shutdown within the timeout
+ */
+void shutdown(final Duration timeout);
+
+/**
+ * Get the current assigned processing task. The task returned is 
read-only and cannot be modified.
+ *
+ * @return the current processing task
+ */
+ReadOnlyTask currentTask();
+
+/**
+ * Unassign the current processing task from the task processor and give 
it back to the state manager.
+ *
+ * The paused task must be flushed since it may be committed or closed by 
the task manager next.
+ *
+ * This method does not block, instead a future is returned.
+ */
+KafkaFuture unassign();

Review Comment:
   The asymmetry is actually intentional indeed -- I will improve the javadocs 
a bit more in the next PR when I gave the default impl class.
   
   The reason is, grabbing a task from task manager to the task executor is 
always initiated by the task executor itself, while returning a task could be 
triggered either way: the task executor could return it proactively when it 
finds the current task not processable any more, and the task manager can also 
force asking the task executor to return it when it needs to commit / close it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12737: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor

2022-10-12 Thread GitBox


guozhangwang commented on code in PR #12737:
URL: https://github.com/apache/kafka/pull/12737#discussion_r993975293


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+
+import java.time.Duration;
+
+public interface TaskExecutor {
+
+/**
+ * Starts the task processor.
+ */
+void start();
+
+/**
+ * Shuts down the task processor updater.
+ *
+ * @param timeout duration how long to wait until the state updater is 
shut down
+ *
+ * @throws
+ * org.apache.kafka.streams.errors.StreamsException if the state 
updater thread cannot shutdown within the timeout
+ */
+void shutdown(final Duration timeout);
+
+/**
+ * Get the current assigned processing task. The task returned is 
read-only and cannot be modified.
+ *
+ * @return the current processing task
+ */
+ReadOnlyTask currentTask();
+
+/**
+ * Unassign the current processing task from the task processor and give 
it back to the state manager.
+ *
+ * The paused task must be flushed since it may be committed or closed by 
the task manager next.

Review Comment:
   It's both: since the task executor's default impl use a background process 
thread, it cannot immediately return the task to the caller, so here we have to 
return a future which will complete when the thread eventually gets notified 
and then pause processing the task.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 commented on a diff in pull request #12736: KAFKA-14292 controlledShutDownOffset should only be updated if set to -1

2022-10-12 Thread GitBox


ahuang98 commented on code in PR #12736:
URL: https://github.com/apache/kafka/pull/12736#discussion_r993968289


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -276,7 +277,7 @@ Collection brokers() {
 }
 
 // VisibleForTesting
-long getControlledShutDownOffset(int brokerId) {
+long controlledShutDownOffset(int brokerId) {

Review Comment:
   hm, I think changing `controlledShutDownOffset()` to return 
`OptionalLong.empty()` instead of `-1` could be slightly unintuitive as `-1` 
was specifically allocated to mean the broker isn't currently in controlled 
shutdown. I think it would make more sense if the underlying field was also 
type `OptionalLong` and used `OptionalLong.empty()` instead of `-1` or was type 
`Long` and used `null` instead of `-1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 commented on a diff in pull request #12736: KAFKA-14292 controlledShutDownOffset should only be updated if set to -1

2022-10-12 Thread GitBox


ahuang98 commented on code in PR #12736:
URL: https://github.com/apache/kafka/pull/12736#discussion_r993968289


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -276,7 +277,7 @@ Collection brokers() {
 }
 
 // VisibleForTesting
-long getControlledShutDownOffset(int brokerId) {
+long controlledShutDownOffset(int brokerId) {

Review Comment:
   hm, I think changing `controlledShutDownOffset()` to return 
`OptionalLong.empty()` instead of `-1` could be slightly unintuitive as `-1` 
was specifically allocated to mean the broker isn't currently in controlled 
shutdown. I think it would make more sense if the underlying field was also 
type `OptionalLong` and used `OptionalLong.empty()` instead of `-1` or was type 
`Long` and could be null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 commented on a diff in pull request #12736: KAFKA-14292 controlledShutDownOffset should only be updated if set to -1

2022-10-12 Thread GitBox


ahuang98 commented on code in PR #12736:
URL: https://github.com/apache/kafka/pull/12736#discussion_r993968289


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -276,7 +277,7 @@ Collection brokers() {
 }
 
 // VisibleForTesting
-long getControlledShutDownOffset(int brokerId) {
+long controlledShutDownOffset(int brokerId) {

Review Comment:
   hm, I think changing `controlledShutDownOffset()` to return 
`OptionalLong.empty()` instead of `-1` could be slightly unintuitive as `-1` 
was specifically allocated to mean the broker isn't currently in controlled 
shutdown. I think it would make more sense if the underlying field was also 
type `OptionalLong` and used `OptionalLong.empty()` instead of `-1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-10-12 Thread GitBox


ableegoldman commented on PR #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1276822852

   Hey @vamossagar12 , sorry for the seriously long silence on this -- it was 
important and I shouldn't have let it slip. Anyways I know you pointed me to a 
specific question about checkpointing but I wanted to discuss this quickly at a 
high-level first:
   
   It's been a while so forgive me for forgetting any context, but I tried to 
refresh my memory on this ticket and want to draw attention back to this part 
of an earlier comment of mine:
   
   > We want to commit only if there are new tasks which will need restoring. 
Unfortunately due to the task lifecycle, specifically that all tasks pass 
through the RESTORING phase before going into RUNNING, it's actually nontrivial 
to figure out if we're going to need to actually spend any time restoring new 
tasks. As a first pass, for now (so we can get some kind of fix into 3.1), we 
can just set commitAssignedActiveTasks = true if there are any newly added 
active tasks at all.
   
   Specifically, I think we want to avoid just blindly committing any/all 
active tasks after a rebalance, because being able to skip that was a major 
reason for the improvements of cooperative rebalancing & being able to keep 
rebalances as lightweight as possible is still important. So rather than doing 
that, let's try and minimize the impact by limiting when we consider committing 
to only when (a) EOS is used (since this is a txn-based issue to begin with) 
and (b) if we have newly-added active tasks (as discussed previously)
   
   Does that make sense? I guess this boils down to, can we first tighten up 
[this condition 
here](https://github.com/apache/kafka/pull/11433/files/230a8ed9abd0b4766985274efadb6236547b38de#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR384)?
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12737: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor

2022-10-12 Thread GitBox


ableegoldman commented on code in PR #12737:
URL: https://github.com/apache/kafka/pull/12737#discussion_r993948622


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskExecutor.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+
+import java.time.Duration;
+
+public interface TaskExecutor {
+
+/**
+ * Starts the task processor.
+ */
+void start();
+
+/**
+ * Shuts down the task processor updater.
+ *
+ * @param timeout duration how long to wait until the state updater is 
shut down
+ *
+ * @throws
+ * org.apache.kafka.streams.errors.StreamsException if the state 
updater thread cannot shutdown within the timeout
+ */
+void shutdown(final Duration timeout);
+
+/**
+ * Get the current assigned processing task. The task returned is 
read-only and cannot be modified.
+ *
+ * @return the current processing task
+ */
+ReadOnlyTask currentTask();
+
+/**
+ * Unassign the current processing task from the task processor and give 
it back to the state manager.
+ *
+ * The paused task must be flushed since it may be committed or closed by 
the task manager next.
+ *
+ * This method does not block, instead a future is returned.
+ */
+KafkaFuture unassign();

Review Comment:
   Maybe clarify the behavior a bit further in the docs here (unless it's still 
undecided) -- eg do we expect that this interrupts the current processing of 
this task ASAP? Does the TaskExecutor do anything (like flushing) or just 
return that task when it can? etc



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/TaskManager.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.tasks;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
+import org.apache.kafka.streams.processor.internals.StreamTask;
+import java.util.Set;
+
+public interface TaskManager {
+
+/**
+ * Get the next processible active task for the requested executor. Once 
the task is assigned to
+ * the requested task executor, it should not be assigned to any other 
executors until it was
+ * returned to the task manager.
+ *
+ * @param executor the requesting {@link TaskExecutor}
+ */
+StreamTask assignNextTask(final TaskExecutor executor);
+
+/**
+ * Unassign the stream task so that it can be assigned to other executors 
later
+ * or be removed from the task manager. The requested executor must have 
locked
+ * the task already, otherwise an exception would be thrown.
+ *
+ * @param executor the requesting {@link TaskExecutor}
+ */
+void unassignTask(final StreamTask task, final TaskExecutor executor);
+
+/**
+ * Lock a set of active tasks from the task manager so that they will not 
be assigned to
+ * any {@link TaskExecutor}s anymore until they are unlocked. At the time 
this function
+ * is called, the requested tasks may already be locked by some {@l

[GitHub] [kafka] hachikuji commented on a diff in pull request #12736: KAFKA-14292 controlledShutDownOffset should only be updated if set to -1

2022-10-12 Thread GitBox


hachikuji commented on code in PR #12736:
URL: https://github.com/apache/kafka/pull/12736#discussion_r993931747


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -276,7 +277,7 @@ Collection brokers() {
 }
 
 // VisibleForTesting
-long getControlledShutDownOffset(int brokerId) {
+long controlledShutDownOffset(int brokerId) {

Review Comment:
   nit: could we call this `controlledShutdownOffset`. I think it is a bit more 
conventional to see "shutdown" as one word, as in 
`updateControlledShutdownOffset` below. (Similarly, perhaps we can rename the 
field.)
   
   Also, how about returning `OptionalLong` from this method to make it a bit 
safer? We can return `OptionalLong.empty()` if either the brokerId is not 
present in `brokers` or if `controlledShutdownOffset` is -1.



##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -414,9 +421,11 @@ void updateControlledShutdownOffset(int brokerId, long 
controlledShutDownOffset)
 throw new RuntimeException("Fenced brokers cannot enter controlled 
shutdown.");
 }
 active.remove(broker);
-broker.controlledShutDownOffset = controlledShutDownOffset;
-log.debug("Updated the controlled shutdown offset for broker {} to 
{}.",
-brokerId, controlledShutDownOffset);
+if (broker.controlledShutDownOffset < 0) {

Review Comment:
   Perhaps we should call this `maybeUpdateControlledShutdownOffset` since the 
update may or may not happen. We should also update the javadoc to describe the 
change in behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on pull request #12733: KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)

2022-10-12 Thread GitBox


niket-goel commented on PR #12733:
URL: https://github.com/apache/kafka/pull/12733#issuecomment-1276769574

   PS submitted https://github.com/apache/kafka/pull/12733 to try to address 
one of the Flaky tests above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12715: MINOR: Refactor KRaft image generation

2022-10-12 Thread GitBox


cmccabe commented on code in PR #12715:
URL: https://github.com/apache/kafka/pull/12715#discussion_r991501134


##
metadata/src/main/java/org/apache/kafka/image/MetadataImage.java:
##
@@ -120,17 +116,17 @@ public AclsImage acls() {
 return acls;
 }
 
-public void write(Consumer> out) {
-MetadataVersion metadataVersion = features.metadataVersion();
+public void write(ImageWriter writer, ImageWriterOptions options) {
 // Features should be written out first so we can include the 
metadata.version at the beginning of the
 // snapshot
-features.write(out);
-cluster.write(out, metadataVersion);
-topics.write(out);
-configs.write(out);
-clientQuotas.write(out);
-producerIds.write(out);
-acls.write(out);
+features.write(writer, options);
+cluster.write(writer, options);
+topics.write(writer, options);
+configs.write(writer, options);
+clientQuotas.write(writer, options);
+producerIds.write(writer, options);
+acls.write(writer, options);
+writer.freeze();

Review Comment:
   I have revised this code so that there is just a close function which takes 
a boolean. We call it here.



##
metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java:
##
@@ -65,24 +64,43 @@ private Optional finalizedVersion(String feature) {
 return Optional.ofNullable(finalizedVersions.get(feature));
 }
 
-public void write(Consumer> out) {
-List batch = new ArrayList<>();
-if 
(!metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
-// Write out the metadata.version record first, and then the rest 
of the finalized features
-batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-setName(MetadataVersion.FEATURE_NAME).
-setFeatureLevel(metadataVersion.featureLevel()), 
FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+public void write(ImageWriter writer, ImageWriterOptions options) {
+if 
(options.metadataVersion().isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION))
 {
+writePreProductionVersion(writer, options);
+} else {
+writeProductionVersion(writer, options);
 }
+}
+
+private void writePreProductionVersion(ImageWriter writer, 
ImageWriterOptions options) {

Review Comment:
   ok



##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -390,14 +404,18 @@ class BrokerMetadataListener(
   }
 
   class GetImageRecordsEvent(future: 
CompletableFuture[util.List[ApiMessageAndVersion]])
-  extends EventQueue.FailureLoggingEvent(log) with 
Consumer[util.List[ApiMessageAndVersion]] {
-val records = new util.ArrayList[ApiMessageAndVersion]()
-override def accept(batch: util.List[ApiMessageAndVersion]): Unit = {
-  records.addAll(batch)
-}
-
+  extends EventQueue.FailureLoggingEvent(log) {
 override def run(): Unit = {
-  _image.write(this)
+  val records = new util.ArrayList[ApiMessageAndVersion]
+  val writer = new RecordListWriter(records)
+  val options = new ImageWriterOptions.Builder().
+setMetadataVersion(_image.features().metadataVersion()).

Review Comment:
   let's defer this to later. there could be some complexity in doing the 
fallback.



##
metadata/src/main/java/org/apache/kafka/image/ImageWriterOptions.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.function.Consumer;
+
+
+/**
+ * The options to use when writing an image.
+ */
+public final class ImageWriterOptions {
+public static class Builder {
+private MetadataVersion metadataVersion = MetadataVersion.latest();
+private Consumer lossHandler = e -> {
+throw e;
+};
+
+public Builder setMetadataVersion(MetadataVersion metadataVersion) {
+if 
(metadataVersion.isLessThan(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION)) {
+// When w

[jira] [Created] (KAFKA-14295) FetchMessageConversionsPerSec meter not recorded

2022-10-12 Thread David Mao (Jira)
David Mao created KAFKA-14295:
-

 Summary: FetchMessageConversionsPerSec meter not recorded
 Key: KAFKA-14295
 URL: https://issues.apache.org/jira/browse/KAFKA-14295
 Project: Kafka
  Issue Type: Bug
Reporter: David Mao


The broker topic metric FetchMessageConversionsPerSec doesn't get recorded on a 
fetch message conversion.

The bug is that we pass in a callback that expects a MultiRecordsSend in 
KafkaApis:
{code:java}
def updateConversionStats(send: Send): Unit = {
  send match {
case send: MultiRecordsSend if send.recordConversionStats != null =>
  send.recordConversionStats.asScala.toMap.foreach {
case (tp, stats) => updateRecordConversionStats(request, tp, stats)
  }
case _ =>
  }
} {code}
But we call this callback with a NetworkSend in the SocketServer:
{code:java}
selector.completedSends.forEach { send =>
  try {
val response = inflightResponses.remove(send.destinationId).getOrElse {
  throw new IllegalStateException(s"Send for ${send.destinationId} 
completed, but not in `inflightResponses`")
}
updateRequestMetrics(response)

// Invoke send completion callback
response.onComplete.foreach(onComplete => onComplete(send))
...{code}
Note that Selector.completedSends returns a collection of NetworkSend



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #12715: MINOR: Refactor KRaft image generation

2022-10-12 Thread GitBox


jsancio commented on code in PR #12715:
URL: https://github.com/apache/kafka/pull/12715#discussion_r993773605


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -179,8 +184,13 @@ class BrokerMetadataListener(
 snapshotter.foreach { snapshotter =>
   if (metadataFaultOccurred.get()) {
 trace("Not starting metadata snapshot since we previously had an 
error")
-  } else if (snapshotter.maybeStartSnapshot(_highestTimestamp, 
_delta.apply(), reason)) {
-_bytesSinceLastSnapshot = 0L
+  } else {
+val newProvenance = new ImageProvenance("generated snapshot",
+  _highestOffset, _highestEpoch, _highestTimestamp)
+val newImage = _delta.apply(newProvenance)

Review Comment:
   Minor but how about `snapshotImage` instead of `newImage`.



##
metadata/src/main/java/org/apache/kafka/image/writer/ImageReWriter.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.writer;
+
+import org.apache.kafka.image.ImageProvenance;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+
+/**
+ * ImageReWriter writes a metadata image out to another metadata image.
+ *
+ * There are a few reasons why you might want to do this. One is to obtain a 
MetadataDelta
+ * object which contains everything in the image. Another is to translate an 
image from
+ * one metadata version to another.
+ */
+public class ImageReWriter implements ImageWriter {
+private final String newSource;
+private final MetadataDelta delta;
+private boolean closed = false;
+private MetadataImage image = null;
+
+public ImageReWriter(
+String newSource,
+MetadataDelta delta
+) {

Review Comment:
   Hmm. Maybe I missed it but doesn't look like this class is used anywhere.
   
   I was interested in its usage because it wasn't clear to me when it would be 
useful to pass in the delta. Instead of starting with a empty delta from an 
empty or non-empty image.



##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -390,14 +404,18 @@ class BrokerMetadataListener(
   }
 
   class GetImageRecordsEvent(future: 
CompletableFuture[util.List[ApiMessageAndVersion]])
-  extends EventQueue.FailureLoggingEvent(log) with 
Consumer[util.List[ApiMessageAndVersion]] {
-val records = new util.ArrayList[ApiMessageAndVersion]()
-override def accept(batch: util.List[ApiMessageAndVersion]): Unit = {
-  records.addAll(batch)
-}
-
+  extends EventQueue.FailureLoggingEvent(log) {
 override def run(): Unit = {
-  _image.write(this)
+  val records = new util.ArrayList[ApiMessageAndVersion]
+  val writer = new RecordListWriter(records)
+  val options = new ImageWriterOptions.Builder().
+setMetadataVersion(_image.features().metadataVersion()).

Review Comment:
   This comment also applies to it used in the snapshotter.
   
   Outside of tests it looks like this option is always set to metadata version 
in the metadata image. What do you think about changing the API so that setting 
the metadata version is optional. If the metadata version is not set then 
`MetadataImage.write` will set the metadata version to the version contained in 
the image?



##
metadata/src/main/java/org/apache/kafka/image/AclsImage.java:
##
@@ -53,13 +49,15 @@ public Map acls() {
 return acls;
 }
 
-public void write(Consumer> out) {
-List batch = new ArrayList<>();
+public void write(ImageWriter writer, ImageWriterOptions options) {
+// Technically, AccessControlEntryRecord appeared in 3.2-IV0, so we 
should not write it if
+// the output version is less than that. However, there is a problem: 
pre-production KRaft
+// images didn't support FeatureLevelRecord, so we can't distinguish 
3.2-IV0 from 3.0-IV1.
+// The least bad way to resolve this is just to pretend that ACLs were 
in 3.0-IV1.

Review Comment:
   I see. Kafka would only write ACLs records to the snapshot if they existed 
in the 

[GitHub] [kafka] hachikuji commented on a diff in pull request #12736: KAFKA-14292 controlledShutDownOffset should only be updated if set to -1

2022-10-12 Thread GitBox


hachikuji commented on code in PR #12736:
URL: https://github.com/apache/kafka/pull/12736#discussion_r993756766


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -414,7 +420,9 @@ void updateControlledShutdownOffset(int brokerId, long 
controlledShutDownOffset)
 throw new RuntimeException("Fenced brokers cannot enter controlled 
shutdown.");
 }
 active.remove(broker);
-broker.controlledShutDownOffset = controlledShutDownOffset;
+if (broker.controlledShutDownOffset < 0) {
+broker.controlledShutDownOffset = controlledShutDownOffset;
+}
 log.debug("Updated the controlled shutdown offset for broker {} to 
{}.",

Review Comment:
   This log message is misleading if we don't update the offset.



##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -275,6 +275,12 @@ Collection brokers() {
 return brokers.values();
 }
 
+// VisibleForTesting
+long getControlledShutDownOffset(int brokerId) {
+return brokers.get(brokerId).controlledShutDownOffset;
+}
+
+

Review Comment:
   In the javadoc for this class, we mention that controlled shutdown is "soft 
state." This is no longer strictly true following KIP-841. Perhaps we could 
update the doc?



##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -275,6 +275,12 @@ Collection brokers() {
 return brokers.values();
 }
 
+// VisibleForTesting
+long getControlledShutDownOffset(int brokerId) {

Review Comment:
   nit: usually we drop the `get` prefix, so just `controlledShutdownOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #12732: MINOR: Fix incorrect example in feature command help

2022-10-12 Thread GitBox


hachikuji merged PR #12732:
URL: https://github.com/apache/kafka/pull/12732


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez commented on a diff in pull request #12729: KAFKA-14285: Delete quota node in zookeeper when configs are empty

2022-10-12 Thread GitBox


soarez commented on code in PR #12729:
URL: https://github.com/apache/kafka/pull/12729#discussion_r993735973


##
core/src/main/scala/kafka/zk/AdminZkClient.scala:
##
@@ -365,6 +365,29 @@ class AdminZkClient(zkClient: KafkaZkClient) extends 
Logging {
   case ConfigType.Ip => changeIpConfig(entityName, configs)
   case _ => throw new IllegalArgumentException(s"$entityType is not a 
known entityType. Should be one of ${ConfigType.all}")
 }
+
+if ((ConfigType.Client.equals(entityType) || 
ConfigType.User.equals(entityType) || ConfigType.Ip.equals(entityType)) && 
configs.isEmpty) {
+  val currPath = ConfigEntityZNode.path(entityType, entityName)
+  if (zkClient.getChildren(currPath).isEmpty) {
+var pathToDelete = currPath
+if (isUserClientId) {
+  val user = entityName.substring(0, entityName.indexOf("/"))
+  val clientId = entityName.substring(entityName.lastIndexOf("/") + 1)
+  val clientsPath = ConfigEntityZNode.path(ConfigType.User, user + "/" 
+ ConfigType.Client)
+  val clientsChildren = zkClient.getChildren(clientsPath)
+  if (clientsChildren.size == 1 && 
clientsChildren.head.equals(clientId)) {

Review Comment:
   ```suggestion
 if (clientsChildren == Seq(clientId)) {
   ```



##
core/src/main/scala/kafka/zk/AdminZkClient.scala:
##
@@ -365,6 +365,29 @@ class AdminZkClient(zkClient: KafkaZkClient) extends 
Logging {
   case ConfigType.Ip => changeIpConfig(entityName, configs)
   case _ => throw new IllegalArgumentException(s"$entityType is not a 
known entityType. Should be one of ${ConfigType.all}")
 }
+
+if ((ConfigType.Client.equals(entityType) || 
ConfigType.User.equals(entityType) || ConfigType.Ip.equals(entityType)) && 
configs.isEmpty) {
+  val currPath = ConfigEntityZNode.path(entityType, entityName)
+  if (zkClient.getChildren(currPath).isEmpty) {
+var pathToDelete = currPath
+if (isUserClientId) {
+  val user = entityName.substring(0, entityName.indexOf("/"))
+  val clientId = entityName.substring(entityName.lastIndexOf("/") + 1)
+  val clientsPath = ConfigEntityZNode.path(ConfigType.User, user + "/" 
+ ConfigType.Client)
+  val clientsChildren = zkClient.getChildren(clientsPath)
+  if (clientsChildren.size == 1 && 
clientsChildren.head.equals(clientId)) {
+pathToDelete = clientsPath
+val userData = fetchEntityConfig(ConfigType.User, user)
+val userPath = ConfigEntityZNode.path(ConfigType.User, user)
+val userChildren = zkClient.getChildren(userPath)
+if (userData.isEmpty && userChildren.size == 1 && 
userChildren.head.equals(ConfigType.Client)) {
+  pathToDelete = userPath
+}
+  }
+}
+zkClient.deletePath(pathToDelete)

Review Comment:
   Looking at this block of code, there are apparently three different 
scenarios for the value of `pathToDelete`, from lines:
   
   1. 372 `var pathToDelete = currPath`
   2. 379 `pathToDelete = clientsPath`
   3. 384 `pathToDelete = userPath`
   
But following through the test cases we only seem to hit two of those. 
`testChangeConfigsWithUserAndClientId` hits 3. and all the other current tests 
stick to 1. Should we have an extra test case to cover scenario 2. or, if that 
scenario should never happen, should this logic be re-structured?



##
core/src/main/scala/kafka/zk/AdminZkClient.scala:
##
@@ -365,6 +365,29 @@ class AdminZkClient(zkClient: KafkaZkClient) extends 
Logging {
   case ConfigType.Ip => changeIpConfig(entityName, configs)
   case _ => throw new IllegalArgumentException(s"$entityType is not a 
known entityType. Should be one of ${ConfigType.all}")
 }
+
+if ((ConfigType.Client.equals(entityType) || 
ConfigType.User.equals(entityType) || ConfigType.Ip.equals(entityType)) && 
configs.isEmpty) {
+  val currPath = ConfigEntityZNode.path(entityType, entityName)
+  if (zkClient.getChildren(currPath).isEmpty) {
+var pathToDelete = currPath
+if (isUserClientId) {
+  val user = entityName.substring(0, entityName.indexOf("/"))
+  val clientId = entityName.substring(entityName.lastIndexOf("/") + 1)
+  val clientsPath = ConfigEntityZNode.path(ConfigType.User, user + "/" 
+ ConfigType.Client)
+  val clientsChildren = zkClient.getChildren(clientsPath)
+  if (clientsChildren.size == 1 && 
clientsChildren.head.equals(clientId)) {
+pathToDelete = clientsPath
+val userData = fetchEntityConfig(ConfigType.User, user)
+val userPath = ConfigEntityZNode.path(ConfigType.User, user)
+val userChildren = zkClient.getChildren(userPath)
+if (userData.isEmpty && userChildren.size == 1 && 
userChildren.head.equals(ConfigType.Client)) {

Review Comment:
   ```suggestion
   if (userData.isEmpty 

[GitHub] [kafka] shekhar-rajak commented on pull request #12735: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest | KAFKA-14059

2022-10-12 Thread GitBox


shekhar-rajak commented on PR #12735:
URL: https://github.com/apache/kafka/pull/12735#issuecomment-1276537604

   Thanks @divijvaidya for the comments. I have done some changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on pull request #12733: KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)

2022-10-12 Thread GitBox


niket-goel commented on PR #12733:
URL: https://github.com/apache/kafka/pull/12733#issuecomment-1276537439

   All failing tests seem to have failed due to general flakiness.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang opened a new pull request, #12737: Kafka Streams Threading P1: Add Interface for new TaskManager and TaskExecutor

2022-10-12 Thread GitBox


guozhangwang opened a new pull request, #12737:
URL: https://github.com/apache/kafka/pull/12737

   1. TaskExecutor is the interface for a processor thread. It takes at most 
one task to process at a given time from the task manager. When being asked 
from the task manager to un-assign the current processing task, it will stop 
processing and give the task back to task manager.
   2. TaskManager schedules all the active tasks to assign to TaskExecutors. 
Specifically: 1) when a task executor ask it for an unassigned task to process 
(assignNextTask), it will return the available task based on its scheduling 
algorithm. 2) when the task manager decides to commit (all) tasks, or when a 
rebalance event requires it to modify the maintained active tasks (via 
onAssignment), it will lock all the tasks that are going to be closed / 
committed, asking the TaskExecutor to give them back if they were being 
processed at the moment.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #12672: KAFKA-14247: Consumer background thread base implementation

2022-10-12 Thread GitBox


philipnee commented on PR #12672:
URL: https://github.com/apache/kafka/pull/12672#issuecomment-1276490455

   Hey @guozhangwang , much thanks for the detail reviews, I tried to address 
some of the comments, please review them.  In particular:
   1. poll timeout logic
   2. InterruptException handling (A side question here, when the BT got 
interrupted, shouldn't we terminate instead of swallowing the exception?)
   3. Added a couple of tests to test the network client calls in a single poll.
   4. I also removed the interface, originally I thought it would be helpful to 
write stubbed tests.
   
   I left the NOOP event there, but I agree we should move it once we've got an 
actual event implemented, which should happen soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-10-12 Thread GitBox


vvcephei commented on PR #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-1276484490

   Hey @showuon , I see you already approved this PR, and it doesn't look like 
any of the following questions were blockers. Do you want to go ahead and merge 
it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 opened a new pull request, #12736: KAFKA-14292 controlledShutDownOffset should only be updated if set to -1

2022-10-12 Thread GitBox


ahuang98 opened a new pull request, #12736:
URL: https://github.com/apache/kafka/pull/12736

   The `controlledShutDownOffset` is defined as the "offset at which the broker 
should complete its controlled shutdown, or -1 if the broker is not performing 
a controlled shutdown". The controller sets this offset to a non-negative 
integer on receiving a heartbeat from a broker that's in controlled shutdown 
state. Currently, this offset is being updated and bumped every single time a 
broker in controlled shutdown mode send a heartbeat, delaying when controlled 
shutdown can actually complete for the broker. We should only update the offset 
when it was previously set to -1 to allow controlled shutdown to complete.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] niket-goel commented on pull request #12733: KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)

2022-10-12 Thread GitBox


niket-goel commented on PR #12733:
URL: https://github.com/apache/kafka/pull/12733#issuecomment-1276460657

   Interesting test failures on this. All are in related code, but not if sure 
if there are flakes or actual failures. Will run some local tests to verify.
   ```
   Build / JDK 8 and Scala 2.12 / testDescribeQuorumRequestToBrokers() – 
kafka.server.KRaftClusterTest 
   (need to understand - failing assertion around return value)
   
   Build / JDK 17 and Scala 2.13 / testIncrementalAlterConfigs() – 
kafka.server.KRaftClusterTest 
   (Could be infrastructural - failed to startup due to heap space)
   
   Build / JDK 17 and Scala 2.13 / testConfigurationOperations() – 
org.apache.kafka.controller.QuorumControllerTest
   (Could be a flake - renouncing from a higher epoch than expected)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14294) Kafka Streams should commit transaction even no records are processed

2022-10-12 Thread Vicky Papavasileiou (Jira)
Vicky Papavasileiou created KAFKA-14294:
---

 Summary: Kafka Streams should commit transaction even no records 
are processed
 Key: KAFKA-14294
 URL: https://issues.apache.org/jira/browse/KAFKA-14294
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Vicky Papavasileiou


Currently, if there are no records to process in the input topic, a transaction 
does not commit. If a custom punctuator code is writing to a state store (which 
is common practice) the producer gets fenced when trying to write to the 
changelog topic. This throws a TaskMigratedException and causes a rebalance. 

A better approach would be to commit a transaction even when there are no 
records processed as to allow the punctuator to make progress. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed

2022-10-12 Thread Vicky Papavasileiou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vicky Papavasileiou updated KAFKA-14294:

Summary: Kafka Streams should commit transaction when no records are 
processed  (was: Kafka Streams should commit transaction even no records are 
processed)

> Kafka Streams should commit transaction when no records are processed
> -
>
> Key: KAFKA-14294
> URL: https://issues.apache.org/jira/browse/KAFKA-14294
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Vicky Papavasileiou
>Priority: Major
>
> Currently, if there are no records to process in the input topic, a 
> transaction does not commit. If a custom punctuator code is writing to a 
> state store (which is common practice) the producer gets fenced when trying 
> to write to the changelog topic. This throws a TaskMigratedException and 
> causes a rebalance. 
> A better approach would be to commit a transaction even when there are no 
> records processed as to allow the punctuator to make progress. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2022-10-12 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-14132:
-
Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # ErrorHandlingTaskTest (owner: shekhar) 
[https://github.com/apache/kafka/pull/12735] 
 # SourceTaskOffsetCommiterTest 
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # WorkerSinkTaskTest (owner: Divij) *WIP* 
 # WorkerSinkTaskThreadedTest (owner: Divij) *WIP*
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # ConnectorsResourceTest (owner: [~mdedetrich-aiven] ) 
([https://github.com/apache/kafka/pull/12725])
 # StandaloneHerderTest (owner: [~mdedetrich-aiven] )
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven] )
 # KafkaOffsetBackingStoreTest (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # KafkaBasedLogTest (owner: [~mdedetrich-aiven] )
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # RepartitionTopicTest (streams) (owner: Christo)
 # StateManagerUtilTest (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # ErrorHandlingTaskTest (owner: Divij)
 # SourceTaskOffsetCommiterTest (owner: Divij)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # WorkerSinkTaskTest (owner: Divij)
 # WorkerSinkTaskThreadedTest (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # ConnectorsResourceTest (owner: [~mdedetrich-aiven] ) 
([https://github.com/apache/kafka/pull/12725])
 # StandaloneHerderTest (owner: [~mdedetrich-aiven] )
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven] )
 # KafkaOffsetBackingStoreTest (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # KafkaBasedLogTest (owner: [~mdedetrich-aiven] )
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # RepartitionTopicTest (streams) (owner: Christo)
 # StateManagerUtilTest (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # ErrorHandlingTaskTest (owner: shekhar) 
> [https://github.com/apache/kafka/pull/12735] 
>  # SourceTaskOffsetCommiterTest 
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # WorkerSinkTaskTest (owner: Divij) *WIP* 
>  # WorkerSinkTaskThreadedTest (owner: Divij) *WIP*
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # ConnectorsResourceTest (owner: [~mdedetrich-aiven] ) 
> ([https://github.com/apache/kafka/pull/12725])
>  # StandaloneHerderTest (owner: [~mdedetrich-a

[GitHub] [kafka] divijvaidya commented on a diff in pull request #12735: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest | KAFKA-14059

2022-10-12 Thread GitBox


divijvaidya commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r993621273


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -224,85 +231,68 @@ public void tearDown() {
 if (metrics != null) {
 metrics.stop();
 }
+mockitoSession.finishMocking();
+
 }
 
 @Test
 public void testSinkTasksCloseErrorReporters() throws Exception {
-ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
+ErrorReporter reporter = mock(ErrorReporter.class);
 
 RetryWithToleranceOperator retryWithToleranceOperator = operator();
 retryWithToleranceOperator.reporters(singletonList(reporter));
 
 createSinkTask(initialState, retryWithToleranceOperator);
-
-expectInitializeTask();
-reporter.close();
-EasyMock.expectLastCall();
-sinkTask.stop();
-EasyMock.expectLastCall();
-
-consumer.close();
-EasyMock.expectLastCall();
-
-headerConverter.close();
-EasyMock.expectLastCall();
-
-PowerMock.replayAll();
-
+doNothing().when(consumer).subscribe(
+eq(singletonList(TOPIC)),
+rebalanceListener.capture());
+doNothing().when(sinkTask).initialize(sinkTaskContext.capture());

Review Comment:
   Where are we using the argument that was captured here? Usually, when we 
capture an argument, we have a place in the code after capturing where we do 
`sinkTaskContext.getValue()` to use the captured value. 



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -139,7 +139,9 @@ public class ErrorHandlingTaskTest {
 @SuppressWarnings("unused")
 @Mock
 private SourceTask sourceTask;
-private Capture sinkTaskContext = 
EasyMock.newCapture();
+
+private ArgumentCaptor sinkTaskContext =

Review Comment:
   You can use @Captor here since you are already using annotations such as 
`@mock`



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -165,7 +167,8 @@ public class ErrorHandlingTaskTest {
 @Mock
 private ConnectorOffsetBackingStore offsetStore;
 
-private Capture rebalanceListener = 
EasyMock.newCapture();
+private ArgumentCaptor rebalanceListener =

Review Comment:
   same comment as earlier about `@captor`



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -494,72 +502,29 @@ private void assertErrorHandlingMetricValue(String name, 
double expected) {
 assertEquals(expected, measured, 0.001d);
 }
 
-private void expectInitializeTask() {
-consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-PowerMock.expectLastCall();
-
-sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-PowerMock.expectLastCall();
-sinkTask.start(TASK_PROPS);
-PowerMock.expectLastCall();
-}
-
-private void expectTaskGetTopic(boolean anyTimes) {
-final Capture connectorCapture = EasyMock.newCapture();
-final Capture topicCapture = EasyMock.newCapture();
-IExpectationSetters expect = 
EasyMock.expect(statusBackingStore.getTopic(
-EasyMock.capture(connectorCapture),
-EasyMock.capture(topicCapture)));
-if (anyTimes) {
-expect.andStubAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-} else {
-expect.andAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-}
-if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-assertEquals("job", connectorCapture.getValue());
-assertEquals(TOPIC, topicCapture.getValue());
-}
-}
-
-private void expectClose() {
-producer.close(EasyMock.anyObject(Duration.class));
-EasyMock.expectLastCall();
-
-admin.close(EasyMock.anyObject(Duration.class));
-EasyMock.expectLastCall();
+private void expectClose() throws IOException {
 
-offsetReader.close();
-EasyMock.expectLastCall();
-
-offsetStore.stop();
-EasyMock.expectLastCall();
-
-try {
-headerConverter.close();
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-EasyMock.expectLastCall();
+verify(producer).close(any(Duration.class));
+verify(admin).close(any(Duration.class));
+verify(offsetReader).close();
+verify(offsetStore).s

[jira] [Commented] (KAFKA-14266) MirrorSourceTask will stop mirroring when get corrupt record

2022-10-12 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616499#comment-17616499
 ] 

Chris Egerton commented on KAFKA-14266:
---

That's good to hear, thanks [~LucentWong]!

> MirrorSourceTask will stop mirroring when get corrupt record
> 
>
> Key: KAFKA-14266
> URL: https://issues.apache.org/jira/browse/KAFKA-14266
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.2.3
>Reporter: Yu Wang
>Assignee: Yu Wang
>Priority: Critical
>
> The mirror task will keeping throwing this error when got a corrupt record
> {code:java}
> [2022-09-28 22:27:07,125] WARN Failure during poll. 
> (org.apache.kafka.connect.mirror.MirrorSourceTask)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from TOPIC-261. If needed, please seek past the record to 
> continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>         at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Record batch for partition 
> TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored 
> crc = 4289549294, computed crc = 3792599753)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
>         ... 12 more {code}
>  
> In the poll function of {*}MirrorSourceTask{*}, when the task got 
> {*}KafkaException{*}, it only print a warn level log and return null.
> {code:java}
> @Override
> public List poll() {
> if (!consumerAccess.tryAcquire()) {
> return null;
> }
> if (stopping) {
> return null;
> }
> try {
> ConsumerRecords records = consumer.poll(pollTimeout);
> List sourceRecords = new ArrayList<>(records.count());
> ...
> if (sourceRecords.isEmpty()) {
> // WorkerSourceTasks expects non-zero batch size
> return null;
> } else {
> log.trace("Polled {} records from {}.", sourceRecords.size(), 
> records.partitions());
> return sourceRecords;
> }
> } catch (WakeupException e) {
> return null;
> } catch (KafkaException e) {
> log.warn("Failure during poll.", e);
> return null;
> } catch (Throwable e)  {
> log.error("Failure during poll.", e);
> // allow Connect to deal with the exception
> throw e;
> } finally {
> consumerAccess.release();
> }
> } {code}
> In the next poll round, the consumer will keep throwing exception because it 
> has received a c

[GitHub] [kafka] calmera commented on a diff in pull request #12188: KAFKA-10892: Shared Readonly State Stores

2022-10-12 Thread GitBox


calmera commented on code in PR #12188:
URL: https://github.com/apache/kafka/pull/12188#discussion_r993559392


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java:
##
@@ -59,7 +59,7 @@ public void testKStreamBranch() {
 
 assertEquals(3, branches.length);
 
-final MockProcessorSupplier supplier = new 
MockProcessorSupplier<>();
+final MockProcessorSupplier supplier = 
new MockProcessorSupplier<>();

Review Comment:
   Yeah, problem is that I need them to decently test the topology.
   
   Let me see what I can do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14293) Basic Auth filter should set the SecurityContext after a successful login

2022-10-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Márton updated KAFKA-14293:
--
Component/s: KafkaConnect

> Basic Auth filter should set the SecurityContext after a successful login
> -
>
> Key: KAFKA-14293
> URL: https://issues.apache.org/jira/browse/KAFKA-14293
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Patrik Márton
>Assignee: Patrik Márton
>Priority: Major
>
> Currently, the JaasBasicAuthFilter does not set the security context of the 
> request after a successful login. However, this information of an 
> authenticated user might be required for further processing, for example to 
> perform authorization checks after the authentication.
> > The filter should be extended to add the Security Context after a 
> > successful login.
> Another improvement would be to assign the right Priority to the filter. The 
> current implementation uses the default priority, which is Priorities.USER = 
> 5000. This is a lower priority than for example AUTHORIZATION, which means 
> that the basic auth filter would run after authorization filters.
> > Assing the correct Priorities.AUTHENTICATION = 1000 priority to the filter 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14293) Basic Auth filter should set the SecurityContext after a successful login

2022-10-12 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Márton reassigned KAFKA-14293:
-

Assignee: Patrik Márton

> Basic Auth filter should set the SecurityContext after a successful login
> -
>
> Key: KAFKA-14293
> URL: https://issues.apache.org/jira/browse/KAFKA-14293
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Patrik Márton
>Assignee: Patrik Márton
>Priority: Major
>
> Currently, the JaasBasicAuthFilter does not set the security context of the 
> request after a successful login. However, this information of an 
> authenticated user might be required for further processing, for example to 
> perform authorization checks after the authentication.
> > The filter should be extended to add the Security Context after a 
> > successful login.
> Another improvement would be to assign the right Priority to the filter. The 
> current implementation uses the default priority, which is Priorities.USER = 
> 5000. This is a lower priority than for example AUTHORIZATION, which means 
> that the basic auth filter would run after authorization filters.
> > Assing the correct Priorities.AUTHENTICATION = 1000 priority to the filter 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] calmera commented on a diff in pull request #12188: KAFKA-10892: Shared Readonly State Stores

2022-10-12 Thread GitBox


calmera commented on code in PR #12188:
URL: https://github.com/apache/kafka/pull/12188#discussion_r993550391


##
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *
+ * A Read Only StateStore can use any compacted topic as a changelog.
+ * 
+ * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+ *
+ * @param storeBuilder  user defined key value store builder

Review Comment:
   There is no real restriction on the type of store, so will adapt the 
description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14099) No REST API request logs in Kafka connect

2022-10-12 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14099.
---
Fix Version/s: 3.4.0
 Reviewer: Chris Egerton
   Resolution: Fixed

> No REST API request logs in Kafka connect
> -
>
> Key: KAFKA-14099
> URL: https://issues.apache.org/jira/browse/KAFKA-14099
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Alexandre Garnier
>Assignee: Alexandre Garnier
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 3.4.0
>
>
> Prior to 2.2.1, when an REST API request was performed, there was a request 
> log in the log file:
> {code:java}
> [2022-07-23 07:18:16,128] INFO 172.18.0.1 - - [23/Jul/2022:07:18:16 +] 
> "GET /connectors HTTP/1.1" 200 2 "-" "curl/7.81.0" 66 
> (org.apache.kafka.connect.runtime.rest.RestServer:62)
> {code}
> Since 2.2.1, no more request logs.
>  
> With a bisect, I found the problem comes from [PR 
> 6651|https://github.com/apache/kafka/pull/6651] to fix KAFKA-8304
> From what I understand of the problem, the ContextHandlerCollection is added 
> in the Server 
> ([https://github.com/dongjinleekr/kafka/blob/63a6130af30536d67fca5802005695a84c875b5e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L195])
>  before handlers are really added in the ContextHandlerCollection 
> ([https://github.com/dongjinleekr/kafka/blob/63a6130af30536d67fca5802005695a84c875b5e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L296]).
> I don't know the impact on other handlers, but clearly it doesn't work for 
> the RequestLogHandler.
>  
> A solution I found for the logging issue is to set the RequestLog directly in 
> the server without using an handlers:
> {code:java}
> diff --git 
> i/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
>  
> w/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
> index ab18419efc..4d09cc0e6c 100644
> --- 
> i/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
> +++ 
> w/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
> @@ -187,6 +187,11 @@ public class RestServer {
>  public void initializeServer() {
>  log.info("Initializing REST server");
>  
> +Slf4jRequestLogWriter slf4jRequestLogWriter = new 
> Slf4jRequestLogWriter();
> +
> slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
> +CustomRequestLog requestLog = new 
> CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT 
> + " %{ms}T");
> +jettyServer.setRequestLog(requestLog);
> +
>  /* Needed for graceful shutdown as per `setStopTimeout` 
> documentation */
>  StatisticsHandler statsHandler = new StatisticsHandler();
>  statsHandler.setHandler(handlers);
> @@ -275,14 +280,7 @@ public class RestServer {
>  configureHttpResponsHeaderFilter(context);
>  }
>  
> -RequestLogHandler requestLogHandler = new RequestLogHandler();
> -Slf4jRequestLogWriter slf4jRequestLogWriter = new 
> Slf4jRequestLogWriter();
> -
> slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
> -CustomRequestLog requestLog = new 
> CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT 
> + " %{ms}T");
> -requestLogHandler.setRequestLog(requestLog);
> -
>  contextHandlers.add(new DefaultHandler());
> -contextHandlers.add(requestLogHandler);
>  
>  handlers.setHandlers(contextHandlers.toArray(new Handler[0]));
>  try {
> {code}
> Same issue raised on StackOverflow: 
> [https://stackoverflow.com/questions/67699702/no-rest-api-logs-in-kafka-connect]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #12434: KAFKA-14099 - Fix request logs in connect

2022-10-12 Thread GitBox


C0urante merged PR #12434:
URL: https://github.com/apache/kafka/pull/12434


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zigarn commented on pull request #12434: KAFKA-14099 - Fix request logs in connect

2022-10-12 Thread GitBox


zigarn commented on PR #12434:
URL: https://github.com/apache/kafka/pull/12434#issuecomment-1276269841

   @C0urante No problem. Your scenario is fine by me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12434: KAFKA-14099 - Fix request logs in connect

2022-10-12 Thread GitBox


C0urante commented on PR #12434:
URL: https://github.com/apache/kafka/pull/12434#issuecomment-1276267039

   Apologies for the delay @zigarn, and thank you for sticking with this.
   
   I'm still not sure we should be relying on the Scala `LogCaptureAppender`, 
but I don't want to ask you to put more work into this PR (you've already done 
plenty!), and I don't want to block this PR on someone else re-reviewing #10528.
   
   What I'd like to do is merge this PR, then on #10528:
   - Rebase onto the latest trunk
   - Revert the changes to the Scala `LogCaptureAppender` introduced in this PR
   - Update the `RestServerTest` to use the Java `LogCaptureAppender`
   - Merge #10528 (it's already been approved and, although I was hoping we 
could get another look at it, it's technically acceptable to merge it now as-is)
   
   Is that alright with you? And again, thank you for sticking with this PR, it 
really is a valuable improvement and I'd like to see it merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14133) Remaining EasyMock to Mockito tests

2022-10-12 Thread Shekhar Prasad Rajak (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616459#comment-17616459
 ] 

Shekhar Prasad Rajak commented on KAFKA-14133:
--

I am working on  StreamsMetricsImplTest file update as well. 

> Remaining EasyMock to Mockito tests
> ---
>
> Key: KAFKA-14133
> URL: https://issues.apache.org/jira/browse/KAFKA-14133
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
> put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here 
> rely solely on EasyMock.{color}
> Unless stated in brackets the tests are in the streams module.
> A list of tests which still require to be moved from EasyMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}In Review{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
>  # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
>  # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
>  # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
>  # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
>  # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
>  # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
>  # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
>  # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
> Christo)
>  # {color:#ff8b00}StreamsRebalanceListenerTest{color} (owner: Christo)
>  # {color:#ff8b00}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
> Christo)
>  # {color:#ff8b00}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}CachingInMemorySessionStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}CachingPersistentSessionStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}CachingPersistentWindowStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} 
> (owner: Christo)
>  # {color:#ff8b00}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}KeyValueStoreBuilderTest{color} (owner: Christo)
>  # {color:#ff8b00}RocksDBStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}StreamThreadStateStoreProviderTest{color} (owner: Christo)
>  # {color:#ff8b00}TopologyTest{color} (owner: Christo)
>  # {color:#ff8b00}KTableSuppressProcessorTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
> Christo)
>  # {color:#ff8b00}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
>  # InternalTopicManagerTest (owner: Christo)
>  # ProcessorContextImplTest (owner: Christo)
>  # WriteConsistencyVectorTest (owner: Christo)
>  # StreamsAssignmentScaleTest (owner: Christo)
>  # StreamsPartitionAssignorTest (owner: Christo)
>  # AssignmentTestUtils (owner: Christo)
>  # ProcessorStateManagerTest ({*}WIP{*} owner: Matthew)
>  # StandbyTaskTest ({*}WIP{*} owner: Matthew)
>  # StoreChangelogReaderTest ({*}WIP{*} owner: Matthew)
>  # StreamTaskTest ({*}WIP{*} owner: Matthew)
>  # StreamThreadTest ({*}WIP{*} owner: Matthew)
>  # StreamsMetricsImplTest
>  # TimeOrderedCachingPersistentWindowStoreTest
>  # TimeOrderedWindowStoreTest
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #12674: KAFKA-14255: Fetching from follower should be disallowed if fetch from follower is disabled

2022-10-12 Thread GitBox


dajac commented on PR #12674:
URL: https://github.com/apache/kafka/pull/12674#issuecomment-1276256629

   Jason and I discussed offline. Returning `OFFSET_NOT_AVAILABLE` is not 
really appropriate here because clients may not expect it on the fetch path. 
The issue is that we have never implemented it on the fetch path so it is not 
clear what the impact of returning it on non-java clients would be. We 
discussed another alternative that I prototypes here: 
https://github.com/apache/kafka/pull/12734.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14133) Remaining EasyMock to Mockito tests

2022-10-12 Thread Shekhar Prasad Rajak (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616454#comment-17616454
 ] 

Shekhar Prasad Rajak commented on KAFKA-14133:
--

I am working on PR [https://github.com/apache/kafka/pull/12735] to replace 
EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest

> Remaining EasyMock to Mockito tests
> ---
>
> Key: KAFKA-14133
> URL: https://issues.apache.org/jira/browse/KAFKA-14133
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
> put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here 
> rely solely on EasyMock.{color}
> Unless stated in brackets the tests are in the streams module.
> A list of tests which still require to be moved from EasyMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}In Review{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] )
>  # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: 
> [~yash.mayya] )
>  # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo)
>  # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo)
>  # {color:#00875a}KStreamPrintTest{color} (owner: Christo)
>  # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo)
>  # {color:#00875a}MaterializedInternalTest{color} (owner: Christo)
>  # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo)
>  # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
>  # {color:#00875a}ClientUtilsTest{color} (owner: Christo)
>  # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
> Christo)
>  # {color:#ff8b00}StreamsRebalanceListenerTest{color} (owner: Christo)
>  # {color:#ff8b00}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
> Christo)
>  # {color:#ff8b00}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}CachingInMemorySessionStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}CachingPersistentSessionStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}CachingPersistentWindowStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} 
> (owner: Christo)
>  # {color:#ff8b00}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}KeyValueStoreBuilderTest{color} (owner: Christo)
>  # {color:#ff8b00}RocksDBStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}StreamThreadStateStoreProviderTest{color} (owner: Christo)
>  # {color:#ff8b00}TopologyTest{color} (owner: Christo)
>  # {color:#ff8b00}KTableSuppressProcessorTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
> Christo)
>  # {color:#ff8b00}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
>  # {color:#ff8b00}TaskManagerTest{color} (owner: Christo)
>  # InternalTopicManagerTest (owner: Christo)
>  # ProcessorContextImplTest (owner: Christo)
>  # WriteConsistencyVectorTest (owner: Christo)
>  # StreamsAssignmentScaleTest (owner: Christo)
>  # StreamsPartitionAssignorTest (owner: Christo)
>  # AssignmentTestUtils (owner: Christo)
>  # ProcessorStateManagerTest ({*}WIP{*} owner: Matthew)
>  # StandbyTaskTest ({*}WIP{*} owner: Matthew)
>  # StoreChangelogReaderTest ({*}WIP{*} owner: Matthew)
>  # StreamTaskTest ({*}WIP{*} owner: Matthew)
>  # StreamThreadTest ({*}WIP{*} owner: Matthew)
>  # StreamsMetricsImplTest
>  # TimeOrderedCachingPersistentWindowStoreTest
>  # TimeOrderedWindowStoreTest
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests

2022-10-12 Thread Shekhar Prasad Rajak (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616450#comment-17616450
 ] 

Shekhar Prasad Rajak commented on KAFKA-14132:
--

https://github.com/apache/kafka/pull/12735

> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # ErrorHandlingTaskTest (owner: Divij)
>  # SourceTaskOffsetCommiterTest (owner: Divij)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # WorkerSinkTaskTest (owner: Divij)
>  # WorkerSinkTaskThreadedTest (owner: Divij)
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # ConnectorsResourceTest (owner: [~mdedetrich-aiven] ) 
> ([https://github.com/apache/kafka/pull/12725])
>  # StandaloneHerderTest (owner: [~mdedetrich-aiven] )
>  # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven] )
>  # KafkaOffsetBackingStoreTest (owner: Christo) 
> ([https://github.com/apache/kafka/pull/12418])
>  # KafkaBasedLogTest (owner: [~mdedetrich-aiven] )
>  # RetryUtilTest (owner: [~mdedetrich-aiven] )
>  # RepartitionTopicTest (streams) (owner: Christo)
>  # StateManagerUtilTest (streams) (owner: Christo)
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] shekhar-rajak commented on a diff in pull request #12735: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest | KAFKA-14059

2022-10-12 Thread GitBox


shekhar-rajak commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r993520740


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -494,72 +502,29 @@ private void assertErrorHandlingMetricValue(String name, 
double expected) {
 assertEquals(expected, measured, 0.001d);
 }
 
-private void expectInitializeTask() {
-consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-PowerMock.expectLastCall();
-
-sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-PowerMock.expectLastCall();
-sinkTask.start(TASK_PROPS);
-PowerMock.expectLastCall();
-}
-
-private void expectTaskGetTopic(boolean anyTimes) {
-final Capture connectorCapture = EasyMock.newCapture();
-final Capture topicCapture = EasyMock.newCapture();
-IExpectationSetters expect = 
EasyMock.expect(statusBackingStore.getTopic(
-EasyMock.capture(connectorCapture),
-EasyMock.capture(topicCapture)));
-if (anyTimes) {
-expect.andStubAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-} else {
-expect.andAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-}
-if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-assertEquals("job", connectorCapture.getValue());
-assertEquals(TOPIC, topicCapture.getValue());
-}
-}
-
-private void expectClose() {
-producer.close(EasyMock.anyObject(Duration.class));
-EasyMock.expectLastCall();
-
-admin.close(EasyMock.anyObject(Duration.class));
-EasyMock.expectLastCall();
+private void expectClose() throws IOException {
 
-offsetReader.close();
-EasyMock.expectLastCall();
-
-offsetStore.stop();
-EasyMock.expectLastCall();
-
-try {
-headerConverter.close();
-} catch (IOException e) {
-throw new RuntimeException(e);
-}
-EasyMock.expectLastCall();
+verify(producer).close(any(Duration.class));
+verify(admin).close(any(Duration.class));
+verify(offsetReader).close();
+verify(offsetStore).stop();
+// headerConverter.close() can throw IOException
+verify(headerConverter).close();
 }
 
 private void expectTopicCreation(String topic) {
 if (workerConfig.topicCreationEnable()) {
-
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-Capture newTopicCapture = EasyMock.newCapture();
+
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
+ArgumentCaptor newTopicCapture =
+ArgumentCaptor.forClass(NewTopic.class);
 
 if (enableTopicCreation) {
 Set created = Collections.singleton(topic);
 Set existing = Collections.emptySet();
 TopicAdmin.TopicCreationResponse response = new 
TopicAdmin.TopicCreationResponse(created, existing);
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response);
+
when(admin.createOrFindTopics(newTopicCapture.capture())).thenReturn(response);

Review Comment:
   Note: these capture not checked. Not sure why we have these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] shekhar-rajak commented on a diff in pull request #12735: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest | KAFKA-14059

2022-10-12 Thread GitBox


shekhar-rajak commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r993519986


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -494,72 +502,29 @@ private void assertErrorHandlingMetricValue(String name, 
double expected) {
 assertEquals(expected, measured, 0.001d);
 }
 
-private void expectInitializeTask() {
-consumer.subscribe(EasyMock.eq(singletonList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-PowerMock.expectLastCall();
-
-sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-PowerMock.expectLastCall();
-sinkTask.start(TASK_PROPS);
-PowerMock.expectLastCall();
-}
-
-private void expectTaskGetTopic(boolean anyTimes) {

Review Comment:
   These has been handled in methods, where this was called .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] shekhar-rajak commented on a diff in pull request #12735: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest | KAFKA-14059

2022-10-12 Thread GitBox


shekhar-rajak commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r993519420


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -494,72 +502,29 @@ private void assertErrorHandlingMetricValue(String name, 
double expected) {
 assertEquals(expected, measured, 0.001d);
 }
 
-private void expectInitializeTask() {

Review Comment:
   These powermock#expect not needed for mockito.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] shekhar-rajak commented on a diff in pull request #12735: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest | KAFKA-14059

2022-10-12 Thread GitBox


shekhar-rajak commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r993518887


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -349,13 +340,15 @@ public void testErrorHandlingInSinkTasks() throws 
Exception {
 // one record completely failed (converter issues), and thus was 
skipped
 assertErrorHandlingMetricValue("total-records-skipped", 1.0);
 
-PowerMock.verifyAll();
 }
 
 private RetryWithToleranceOperator operator() {
-return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, 
OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM, 
errorHandlingMetrics);
+return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS
+, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE,
+SYSTEM, errorHandlingMetrics);
 }
 
+//@Ignore

Review Comment:
   When I try to run all the test case then this test case is taking forever to 
run. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] shekhar-rajak commented on a diff in pull request #12735: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest | KAFKA-14059

2022-10-12 Thread GitBox


shekhar-rajak commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r993517764


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -177,16 +180,14 @@ public class ErrorHandlingTaskTest {
 
 private ErrorHandlingMetrics errorHandlingMetrics;
 
-private boolean enableTopicCreation;
+private boolean enableTopicCreation = false;
 
+private MockitoSession mockitoSession;
 @ParameterizedTest.Parameters
 public static Collection parameters() {
 return Arrays.asList(false, true);
 }
 
-public ErrorHandlingTaskTest(boolean enableTopicCreation) {

Review Comment:
   I had to remove this, since test class should have zero argument constructor 
(I got this error message while trying to run the test case class). 
   
   I have set the enableTopicCreation value as false for now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] shekhar-rajak opened a new pull request, #12735: Replace EasyMock and PowerMock with Mockito in connect/runtime/ErrorHandlingTaskTest | KAFKA-14059

2022-10-12 Thread GitBox


shekhar-rajak opened a new pull request, #12735:
URL: https://github.com/apache/kafka/pull/12735

   Related to KAFKA-14059 and KAFKA-14132
   
   Link: 
   
   * https://issues.apache.org/jira/browse/KAFKA-14133
   * https://issues.apache.org/jira/browse/KAFKA-14132
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #10910: KAFKA-12965 - Graceful clean up of task error metrics

2022-10-12 Thread GitBox


C0urante merged PR #10910:
URL: https://github.com/apache/kafka/pull/10910


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12355: KAFKA-14017: Implement new KIP-618 APIs in FileStreamSourceConnector

2022-10-12 Thread GitBox


C0urante commented on PR #12355:
URL: https://github.com/apache/kafka/pull/12355#issuecomment-1276195174

   @showuon Would you mind giving this a pass? Should be a quick one!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12355: KAFKA-14017: Implement new KIP-618 APIs in FileStreamSourceConnector

2022-10-12 Thread GitBox


C0urante commented on code in PR #12355:
URL: https://github.com/apache/kafka/pull/12355#discussion_r993477767


##
connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java:
##
@@ -95,4 +96,18 @@ public void stop() {
 public ConfigDef config() {
 return CONFIG_DEF;
 }
+
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map props) {

Review Comment:
   👍 Thanks Yash.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12734: KAFKA-14255; Return an empty record instead of an OffsetOutOfRangeException when fetching from a follower without a leader epoch

2022-10-12 Thread GitBox


dajac commented on PR #12734:
URL: https://github.com/apache/kafka/pull/12734#issuecomment-1276174598

   This is an potential alternative to 
https://github.com/apache/kafka/pull/12674.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #12734: KAFKA-14255; Return an empty record instead of an OffsetOutOfRangeException when fetching from a follower without a leader epoch

2022-10-12 Thread GitBox


dajac opened a new pull request, #12734:
URL: https://github.com/apache/kafka/pull/12734

   Fetching from a follower is only allowed from version 11 of the fetch 
request. Our intent was to allow it assuming that those would also implement 
KIP-320 (leader epoch). It turns out that some clients use version 11 without 
KIP-320 and the broker allows this. The issue is that we don't know whether the 
client fetches from the follower based on the order of the leader or by mistake 
e.g. based on stale metadata. The latter means that a client could end up on 
the follower with an offset that the follower does not have yet. Instead of 
returning OffsetOutOfRangeException, we return an empty batch to the client 
with the expectation that the client will retry and eventually refresh its 
metadata. Note that we only do this if the client does not provide a leader 
epoch and use version 11. If the client uses version 11 and provided a leader 
epoch, it knows that it has to consult the leader on an 
OffsetOutOfRangeException error.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14293) Basic Auth filter should set the SecurityContext after a successful login

2022-10-12 Thread Jira
Patrik Márton created KAFKA-14293:
-

 Summary: Basic Auth filter should set the SecurityContext after a 
successful login
 Key: KAFKA-14293
 URL: https://issues.apache.org/jira/browse/KAFKA-14293
 Project: Kafka
  Issue Type: Improvement
Reporter: Patrik Márton


Currently, the JaasBasicAuthFilter does not set the security context of the 
request after a successful login. However, this information of an authenticated 
user might be required for further processing, for example to perform 
authorization checks after the authentication.

> The filter should be extended to add the Security Context after a successful 
> login.

Another improvement would be to assign the right Priority to the filter. The 
current implementation uses the default priority, which is Priorities.USER = 
5000. This is a lower priority than for example AUTHORIZATION, which means that 
the basic auth filter would run after authorization filters.

> Assing the correct Priorities.AUTHENTICATION = 1000 priority to the filter 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7739) Kafka Tiered Storage

2022-10-12 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616235#comment-17616235
 ] 

Evan Williams commented on KAFKA-7739:
--

[~satish.duggana] How is this KIP moving along? 3.3.1 is released. Do you see 
any progress on it in the near future ?

> Kafka Tiered Storage
> 
>
> Key: KAFKA-7739
> URL: https://issues.apache.org/jira/browse/KAFKA-7739
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Harsha
>Assignee: Satish Duggana
>Priority: Major
>  Labels: needs-kip
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)