Shiyang-Zhao opened a new pull request, #18691:
URL: https://github.com/apache/druid/pull/18691

   This PR fixes nondeterministic behavior in the following flaky tests:  
   - 
`org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithSchemaDiscovery`
  
   - 
`org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery`
  
   - 
`org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTest.testKillBadPartitionAssignment`
  
   - 
`org.apache.druid.indexing.overlord.TaskQueueTest.testGetActiveTaskRedactsPassword`
  
   
   ### **Description**
   
   The `KafkaInputFormatTest.testWithSchemaDiscovery` and  
   `KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery` tests 
failed intermittently due to nondeterministic ordering of discovered schema 
dimensions.
   
   These tests check that Kafka input parsing discovers all schema fields. 
However, field names were collected from unordered structures, causing 
inconsistent dimension order and intermittent assertion failures.
   
   **Failure messages:**  
   ```
   [ERROR] 
org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithSchemaDiscovery
 -- Time elapsed: 0.609 s <<< FAILURE!
   java.lang.AssertionError: expected:<[kafka.newtopic.topic, foo, 
kafka.newts.timestamp, kafka.newkey.key, root_baz, o, bar, 
kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, root_baz2, 
kafka.newheader.encoding, path_omg2]> but was:<[jq_omg2, 
kafka.newheader.kafkapkc, foo, kafka.newts.timestamp, kafka.newkey.key, 
kafka.newheader.encoding, path_omg2, path_omg, root_baz, root_baz2, 
kafka.newtopic.topic, bar, o, jq_omg, baz]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.junit.Assert.assertEquals(Assert.java:146)
        at 
org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithSchemaDiscovery(KafkaInputFormatTest.java:645)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
   
   [ERROR] Failures: 
   [ERROR] 
org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithSchemaDiscovery
   [ERROR]   Run 1: KafkaInputFormatTest.testWithSchemaDiscovery:645 
expected:<[kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, 
root_baz, o, bar, kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, 
root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[jq_omg2, 
kafka.newheader.kafkapkc, foo, kafka.newts.timestamp, kafka.newkey.key, 
kafka.newheader.encoding, path_omg2, path_omg, root_baz, root_baz2, 
kafka.newtopic.topic, bar, o, jq_omg, baz]>
   [ERROR]   Run 2: KafkaInputFormatTest.testWithSchemaDiscovery:645 
expected:<[kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, 
root_baz, o, bar, kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, 
root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[baz, 
kafka.newts.timestamp, jq_omg, bar, o, root_baz2, path_omg2, 
kafka.newtopic.topic, kafka.newheader.kafkapkc, root_baz, path_omg, foo, 
jq_omg2, kafka.newkey.key, kafka.newheader.encoding]>
   [ERROR]   Run 3: KafkaInputFormatTest.testWithSchemaDiscovery:645 
expected:<[kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, 
root_baz, o, bar, kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, 
root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[o, 
kafka.newheader.encoding, path_omg, bar, jq_omg, root_baz, root_baz2, 
kafka.newheader.kafkapkc, foo, jq_omg2, kafka.newkey.key, path_omg2, baz, 
kafka.newts.timestamp, kafka.newtopic.topic]>
   [ERROR]   Run 4: KafkaInputFormatTest.testWithSchemaDiscovery:645 
expected:<[kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, 
root_baz, o, bar, kafka.newheader.kafkapkc, path_omg, jq_omg, jq_omg2, baz, 
root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[foo, jq_omg, 
root_baz, jq_omg2, kafka.newkey.key, kafka.newts.timestamp, bar, baz, 
kafka.newtopic.topic, path_omg, kafka.newheader.encoding, o, 
kafka.newheader.kafkapkc, path_omg2, root_baz2]>
   ```
   ```
   [ERROR] 
org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery
 -- Time elapsed: 0.622 s <<< FAILURE!
   java.lang.AssertionError: expected:<[bar, kafka.newheader.kafkapkc, 
kafka.newtopic.topic, foo, kafka.newts.timestamp, kafka.newkey.key, root_baz, 
o, path_omg, jq_omg, jq_omg2, baz, root_baz2, kafka.newheader.encoding, 
path_omg2]> but was:<[bar, kafka.newheader.kafkapkc, o, kafka.newkey.key, 
path_omg, kafka.newheader.encoding, kafka.newtopic.topic, baz, path_omg2, 
jq_omg2, kafka.newts.timestamp, root_baz2, root_baz, jq_omg, foo]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.junit.Assert.assertEquals(Assert.java:146)
        at 
org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery(KafkaInputFormatTest.java:908)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
   
   [ERROR] Failures: 
   [ERROR] 
org.apache.druid.data.input.kafkainput.KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery
   [ERROR]   Run 1: 
KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery:908 
expected:<[bar, kafka.newheader.kafkapkc, kafka.newtopic.topic, foo, 
kafka.newts.timestamp, kafka.newkey.key, root_baz, o, path_omg, jq_omg, 
jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[bar, 
kafka.newheader.kafkapkc, o, kafka.newkey.key, path_omg, 
kafka.newheader.encoding, kafka.newtopic.topic, baz, path_omg2, jq_omg2, 
kafka.newts.timestamp, root_baz2, root_baz, jq_omg, foo]>
   [ERROR]   Run 2: 
KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery:908 
expected:<[bar, kafka.newheader.kafkapkc, kafka.newtopic.topic, foo, 
kafka.newts.timestamp, kafka.newkey.key, root_baz, o, path_omg, jq_omg, 
jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[bar, 
kafka.newheader.kafkapkc, o, foo, root_baz, kafka.newts.timestamp, baz, 
jq_omg2, kafka.newtopic.topic, path_omg2, path_omg, jq_omg, 
kafka.newheader.encoding, kafka.newkey.key, root_baz2]>
   [ERROR]   Run 3: 
KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery:908 
expected:<[bar, kafka.newheader.kafkapkc, kafka.newtopic.topic, foo, 
kafka.newts.timestamp, kafka.newkey.key, root_baz, o, path_omg, jq_omg, 
jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[bar, 
kafka.newheader.kafkapkc, kafka.newkey.key, path_omg, kafka.newheader.encoding, 
root_baz2, jq_omg2, baz, root_baz, kafka.newtopic.topic, path_omg2, o, foo, 
kafka.newts.timestamp, jq_omg]>
   [ERROR]   Run 4: 
KafkaInputFormatTest.testWithPartialDeclarationSchemaDiscovery:908 
expected:<[bar, kafka.newheader.kafkapkc, kafka.newtopic.topic, foo, 
kafka.newts.timestamp, kafka.newkey.key, root_baz, o, path_omg, jq_omg, 
jq_omg2, baz, root_baz2, kafka.newheader.encoding, path_omg2]> but was:<[bar, 
kafka.newheader.kafkapkc, baz, jq_omg2, kafka.newheader.encoding, path_omg2, 
jq_omg, root_baz, kafka.newkey.key, foo, kafka.newts.timestamp, path_omg, 
root_baz2, kafka.newtopic.topic, o]>
   ```
   
   **Proposed Changes:**  
   - Ensured schema validation compares contents, not ordering. 
   
   ---
   
   The `KinesisSupervisorTest.testKillBadPartitionAssignment` failed 
intermittently due to nondeterministic ordering of task entries.
   
   The test verifies that the Kinesis supervisor correctly identifies and 
terminates invalid task assignments. Since task metadata was stored in a 
`HashMap`, iteration order varied between runs, causing inconsistent 
comparisons of task IDs and flaky assertions.
   
   **Failure messages:**  
   ```
   [ERROR] 
org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTest.testKillBadPartitionAssignment
 -- Time elapsed: 0.777 s <<< FAILURE!
   java.lang.AssertionError: 
   
     Unexpected method call TaskQueue.shutdown("id2", "Task[%s] failed to 
return status, killing task", "id2"):
       TaskQueue.shutdown("id4", "Task[%s] failed to return status, killing 
task", "id4"): expected: 1, actual: 0
        at 
org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46)
        at 
org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:100)
        at 
org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.interceptSuperCallable(ClassProxyFactory.java:100)
        at 
org.apache.druid.indexing.overlord.TaskQueue$$$EasyMock$2.shutdown(Unknown 
Source)
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.killTask(SeekableStreamSupervisor.java:2021)
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.discoverTasks(SeekableStreamSupervisor.java:2282)
        at 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.runInternal(SeekableStreamSupervisor.java:1728)
        at 
org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTest.testKillBadPartitionAssignment(KinesisSupervisorTest.java:1107)
   
   [WARNING] Flakes: 
   [WARNING] 
org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTest.testKillBadPartitionAssignment
   [ERROR]   Run 1: KinesisSupervisorTest.testKillBadPartitionAssignment:1107 
     Unexpected method call TaskQueue.shutdown("id2", "Task[%s] failed to 
return status, killing task", "id2"):
       TaskQueue.shutdown("id4", "Task[%s] failed to return status, killing 
task", "id4"): expected: 1, actual: 0
   [ERROR]   Run 2: KinesisSupervisorTest.testKillBadPartitionAssignment:1107 
     Unexpected method call TaskQueue.shutdown("id2", "Task[%s] failed to 
return status, killing task", "id2"):
       TaskQueue.shutdown("id4", "Task[%s] failed to return status, killing 
task", "id4"): expected: 1, actual: 0
   [INFO]   Run 3: PASS
   ```
   
   **Proposed Changes:**   
   - Made task comparison stable and deterministic across runs.
   
   ---
   
   The `TaskQueueTest.testGetActiveTaskRedactsPassword` failed intermittently 
due to inconsistent JSON key ordering in serialized task payloads.
   
   The test ensures that sensitive information (like passwords) is properly 
redacted when serializing active task metadata. However, since JSON objects do 
not preserve field order, direct string comparison caused false failures when 
the same data appeared with keys in different order.
   
   **Failure messages:**  
   ```
   [ERROR] 
org.apache.druid.indexing.overlord.TaskQueueTest.testGetActiveTaskRedactsPassword
 -- Time elapsed: 4.451 s <<< FAILURE!
   org.junit.ComparisonFailure: 
expected:<...s":[]}},"context":{"[forceTimeChunkLock":true,"useLineageBasedSegmentAllocation]":true},"dataSource"...>
 but 
was:<...s":[]}},"context":{"[useLineageBasedSegmentAllocation":true,"forceTimeChunkLock]":true},"dataSource"...>
        at org.junit.Assert.assertEquals(Assert.java:117)
        at org.junit.Assert.assertEquals(Assert.java:146)
        at 
org.apache.druid.indexing.overlord.TaskQueueTest.testGetActiveTaskRedactsPassword(TaskQueueTest.java:625)
   
   [WARNING] Flakes: 
   [WARNING] 
org.apache.druid.indexing.overlord.TaskQueueTest.testGetActiveTaskRedactsPassword
   [ERROR]   Run 1: TaskQueueTest.testGetActiveTaskRedactsPassword:625 
expected:<...s":[]}},"context":{"[forceTimeChunkLock":true,"useLineageBasedSegmentAllocation]":true},"dataSource"...>
 but 
was:<...s":[]}},"context":{"[useLineageBasedSegmentAllocation":true,"forceTimeChunkLock]":true},"dataSource"...>
   [ERROR]   Run 2: TaskQueueTest.testGetActiveTaskRedactsPassword:625 
expected:<...s":[]}},"context":{"[forceTimeChunkLock":true,"useLineageBasedSegmentAllocation]":true},"dataSource"...>
 but 
was:<...s":[]}},"context":{"[useLineageBasedSegmentAllocation":true,"forceTimeChunkLock]":true},"dataSource"...>
   [ERROR]   Run 3: TaskQueueTest.testGetActiveTaskRedactsPassword:625 
expected:<...s":[]}},"context":{"[forceTimeChunkLock":true,"useLineageBasedSegmentAllocation]":true},"dataSource"...>
 but 
was:<...s":[]}},"context":{"[useLineageBasedSegmentAllocation":true,"forceTimeChunkLock]":true},"dataSource"...>
   [INFO]   Run 4: PASS
   ```
   
   **Proposed Changes:**  
   - Modified assertions to perform JSON object-level comparison rather than 
raw string equality.  
   
   ---
   
   This PR has:
   
   - [x] been self-reviewed.  
   - [x] ensured no production logic changes beyond test stabilization. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to