This is an automated email from the ASF dual-hosted git repository.

xkrogen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 103b0546548 HADOOP-19270 Use stable sort in commandQueue (#7038)
103b0546548 is described below

commit 103b0546548dc831621221f94d56a60da72531cc
Author: geatrigger <53654547+geatrig...@users.noreply.github.com>
AuthorDate: Wed Mar 5 07:45:20 2025 +0900

    HADOOP-19270 Use stable sort in commandQueue (#7038)
---
 .../audit/AuditCommandParser.java                  |  5 +--
 .../audit/AuditLogDirectParser.java                |  5 +--
 .../audit/AuditLogHiveTableParser.java             |  4 +--
 .../audit/AuditReplayCommand.java                  | 30 +++++++++++------
 .../workloadgenerator/audit/AuditReplayMapper.java |  4 ++-
 .../audit/TestAuditLogDirectParser.java            | 39 +++++++++++-----------
 6 files changed, 51 insertions(+), 36 deletions(-)

diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditCommandParser.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditCommandParser.java
index c076a2061ef..ee8af3e0fdc 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditCommandParser.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditCommandParser.java
@@ -32,7 +32,7 @@ public interface AuditCommandParser {
 
   /**
    * Initialize this parser with the given configuration. Guaranteed to be
-   * called prior to any calls to {@link #parse(Text, Function)}.
+   * called prior to any calls to {@link #parse(Long, Text, Function)}.
    *
    * @param conf The Configuration to be used to set up this parser.
    * @throws IOException if error on initializing a parser.
@@ -46,6 +46,7 @@ public interface AuditCommandParser {
    * between the start of the audit log and this command) into absolute
    * timestamps.
    *
+   * @param sequence Sequence order of input line.
    * @param inputLine Single input line to convert.
    * @param relativeToAbsolute Function converting relative timestamps
    *                           (in milliseconds) to absolute timestamps
@@ -53,7 +54,7 @@ public interface AuditCommandParser {
    * @return A command representing the input line.
    * @throws IOException if error on parsing.
    */
-  AuditReplayCommand parse(Text inputLine,
+  AuditReplayCommand parse(Long sequence, Text inputLine,
       Function<Long, Long> relativeToAbsolute) throws IOException;
 
 }
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java
index e649b69be73..3eb6d6766fe 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java
@@ -112,7 +112,7 @@ public void initialize(Configuration conf) throws 
IOException {
   }
 
   @Override
-  public AuditReplayCommand parse(Text inputLine,
+  public AuditReplayCommand parse(Long sequence, Text inputLine,
       Function<Long, Long> relativeToAbsolute) throws IOException {
     Matcher m = logLineParseRegex.matcher(inputLine.toString());
     if (!m.find()) {
@@ -147,7 +147,8 @@ public AuditReplayCommand parse(Text inputLine,
       }
     }
 
-    return new AuditReplayCommand(relativeToAbsolute.apply(relativeTimestamp),
+    return new AuditReplayCommand(sequence,
+        relativeToAbsolute.apply(relativeTimestamp),
         // Split the UGI on space to remove the auth and proxy portions of it
         SPACE_SPLITTER.split(parameterMap.get("ugi")).iterator().next(),
         parameterMap.get("cmd").replace("(options:", "(options="),
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java
index abc10f4e33f..23224b6414c 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java
@@ -60,12 +60,12 @@ public void initialize(Configuration conf) throws 
IOException {
   }
 
   @Override
-  public AuditReplayCommand parse(Text inputLine,
+  public AuditReplayCommand parse(Long sequence, Text inputLine,
       Function<Long, Long> relativeToAbsolute) throws IOException {
     String[] fields = inputLine.toString().split(FIELD_SEPARATOR);
     long absoluteTimestamp = relativeToAbsolute
         .apply(Long.parseLong(fields[0]));
-    return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2],
+    return new AuditReplayCommand(sequence, absoluteTimestamp, fields[1], 
fields[2],
         fields[3], fields[4], fields[5]);
   }
 
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayCommand.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayCommand.java
index 038312d2e9f..9576245396a 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayCommand.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayCommand.java
@@ -43,6 +43,7 @@ class AuditReplayCommand implements Delayed {
   private static final Pattern SIMPLE_UGI_PATTERN = Pattern
       .compile("([^/@ ]*).*?");
 
+  private Long sequence;
   private long absoluteTimestamp;
   private String ugi;
   private String command;
@@ -50,8 +51,9 @@ class AuditReplayCommand implements Delayed {
   private String dest;
   private String sourceIP;
 
-  AuditReplayCommand(long absoluteTimestamp, String ugi, String command,
+  AuditReplayCommand(Long sequence, long absoluteTimestamp, String ugi, String 
command,
       String src, String dest, String sourceIP) {
+    this.sequence = sequence;
     this.absoluteTimestamp = absoluteTimestamp;
     this.ugi = ugi;
     this.command = command;
@@ -60,6 +62,9 @@ class AuditReplayCommand implements Delayed {
     this.sourceIP = sourceIP;
   }
 
+  Long getSequence() {
+    return sequence;
+  }
   long getAbsoluteTimestamp() {
     return absoluteTimestamp;
   }
@@ -103,8 +108,12 @@ public long getDelay(TimeUnit unit) {
 
   @Override
   public int compareTo(Delayed o) {
-    return Long.compare(absoluteTimestamp,
-        ((AuditReplayCommand) o).absoluteTimestamp);
+    int result = Long.compare(absoluteTimestamp,
+            ((AuditReplayCommand) o).absoluteTimestamp);
+    if (result != 0) {
+      return result;
+    }
+    return Long.compare(sequence, ((AuditReplayCommand) o).sequence);
   }
 
   /**
@@ -122,9 +131,10 @@ boolean isPoison() {
    * information besides a timestamp; other getter methods wil return null.
    */
   private static final class PoisonPillCommand extends AuditReplayCommand {
+    private static final Long DEFAULT_SEQUENCE = -1L;
 
     private PoisonPillCommand(long absoluteTimestamp) {
-      super(absoluteTimestamp, null, null, null, null, null);
+      super(DEFAULT_SEQUENCE, absoluteTimestamp, null, null, null, null, null);
     }
 
     @Override
@@ -144,9 +154,9 @@ public boolean equals(Object other) {
       return false;
     }
     AuditReplayCommand o = (AuditReplayCommand) other;
-    return absoluteTimestamp == o.absoluteTimestamp && ugi.equals(o.ugi)
-        && command.equals(o.command) && src.equals(o.src) && 
dest.equals(o.dest)
-        && sourceIP.equals(o.sourceIP);
+    return sequence.equals(o.sequence) && absoluteTimestamp == 
o.absoluteTimestamp
+        && ugi.equals(o.ugi) && command.equals(o.command) && src.equals(o.src)
+        && dest.equals(o.dest) && sourceIP.equals(o.sourceIP);
   }
 
   @Override
@@ -156,8 +166,8 @@ public int hashCode() {
 
   @Override
   public String toString() {
-    return String.format("AuditReplayCommand(absoluteTimestamp=%d, ugi=%s, "
-            + "command=%s, src=%s, dest=%s, sourceIP=%s",
-        absoluteTimestamp, ugi, command, src, dest, sourceIP);
+    return String.format("AuditReplayCommand(sequence=%d, 
absoluteTimestamp=%d, "
+        + "ugi=%s, command=%s, src=%s, dest=%s, sourceIP=%s",
+        sequence, absoluteTimestamp, ugi, command, src, dest, sourceIP);
   }
 }
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
index fdd8f20d85b..4524e7a2bb0 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java
@@ -169,6 +169,7 @@ public enum CommandType {
   private int numThreads;
   private double rateFactor;
   private long highestTimestamp;
+  private Long highestSequence;
   private List<AuditReplayThread> threads;
   private DelayQueue<AuditReplayCommand> commandQueue;
   private Function<Long, Long> relativeToAbsoluteTimestamp;
@@ -246,7 +247,7 @@ public void setup(final Mapper.Context context) throws 
IOException {
   @Override
   public void map(LongWritable lineNum, Text inputLine, Mapper.Context context)
       throws IOException, InterruptedException {
-    AuditReplayCommand cmd = commandParser.parse(inputLine,
+    AuditReplayCommand cmd = commandParser.parse(lineNum.get(), inputLine,
         relativeToAbsoluteTimestamp);
     long delay = cmd.getDelay(TimeUnit.MILLISECONDS);
     // Prevent from loading too many elements into memory all at once
@@ -255,6 +256,7 @@ public void map(LongWritable lineNum, Text inputLine, 
Mapper.Context context)
     }
     commandQueue.put(cmd);
     highestTimestamp = cmd.getAbsoluteTimestamp();
+    highestSequence = cmd.getSequence();
   }
 
   @Override
diff --git 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java
 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java
index a98b5c47164..007445e10fb 100644
--- 
a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java
+++ 
b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java
@@ -30,6 +30,7 @@ public class TestAuditLogDirectParser {
 
   private static final long START_TIMESTAMP = 10000;
   private AuditLogDirectParser parser;
+  private Long sequence = 1L;
 
   @Before
   public void setup() throws Exception {
@@ -53,55 +54,55 @@ private Text getAuditString(String timestamp, String ugi, 
String cmd,
   public void testSimpleInput() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser",
         "listStatus", "sourcePath", "null");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, 
"fakeUser",
         "listStatus", "sourcePath", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testInputWithEquals() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser",
             "listStatus", "day=1970", "null");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, 
"fakeUser",
             "listStatus", "day=1970", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testInputWithRenameOptions() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser",
         "rename (options=[TO_TRASH])", "sourcePath", "destPath");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, 
"fakeUser",
         "rename (options=[TO_TRASH])", "sourcePath", "destPath", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testInputWithTokenAuth() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser 
(auth:TOKEN)",
         "create", "sourcePath", "null");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser",
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, 
"fakeUser",
         "create", "sourcePath", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testInputWithProxyUser() throws Exception {
     Text in = getAuditString("1970-01-01 00:00:11,000",
         "proxyUser (auth:TOKEN) via fakeUser", "create", "sourcePath", "null");
-    AuditReplayCommand expected = new AuditReplayCommand(1000, "proxyUser",
-        "create", "sourcePath", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000,
+            "proxyUser", "create", "sourcePath", "null", "0.0.0.0");
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
   public void testParseDefaultDateFormat() throws Exception {
     Text in = getAuditString("1970-01-01 13:00:00,000",
         "ignored", "ignored", "ignored", "ignored");
-    AuditReplayCommand expected = new AuditReplayCommand(
+    AuditReplayCommand expected = new AuditReplayCommand(sequence,
         13 * 60 * 60 * 1000 - START_TIMESTAMP,
         "ignored", "ignored", "ignored", "ignored", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
@@ -114,9 +115,9 @@ public void testParseCustomDateFormat() throws Exception {
     parser.initialize(conf);
     Text in = getAuditString("1970-01-01 01:00:00,000 PM",
         "ignored", "ignored", "ignored", "ignored");
-    AuditReplayCommand expected = new AuditReplayCommand(13 * 60 * 60 * 1000,
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 13 * 60 * 
60 * 1000,
         "ignored", "ignored", "ignored", "ignored", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
@@ -128,9 +129,9 @@ public void testParseCustomTimeZone() throws Exception {
     parser.initialize(conf);
     Text in = getAuditString("1970-01-01 01:00:00,000",
         "ignored", "ignored", "ignored", "ignored");
-    AuditReplayCommand expected = new AuditReplayCommand(0,
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 0,
         "ignored", "ignored", "ignored", "ignored", "0.0.0.0");
-    assertEquals(expected, parser.parse(in, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, in, Function.identity()));
   }
 
   @Test
@@ -144,9 +145,9 @@ public void testParseCustomAuditLineFormat() throws 
Exception {
     conf.set(AuditLogDirectParser.AUDIT_LOG_PARSE_REGEX_KEY,
         "CUSTOM FORMAT \\((?<timestamp>.+?)\\) (?<message>.+)");
     parser.initialize(conf);
-    AuditReplayCommand expected = new AuditReplayCommand(0,
+    AuditReplayCommand expected = new AuditReplayCommand(sequence, 0,
         "fakeUser", "fakeCommand", "src", "null", "0.0.0.0");
-    assertEquals(expected, parser.parse(auditLine, Function.identity()));
+    assertEquals(expected, parser.parse(sequence, auditLine, 
Function.identity()));
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to