This is an automated email from the ASF dual-hosted git repository. xkrogen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 103b0546548 HADOOP-19270 Use stable sort in commandQueue (#7038) 103b0546548 is described below commit 103b0546548dc831621221f94d56a60da72531cc Author: geatrigger <53654547+geatrig...@users.noreply.github.com> AuthorDate: Wed Mar 5 07:45:20 2025 +0900 HADOOP-19270 Use stable sort in commandQueue (#7038) --- .../audit/AuditCommandParser.java | 5 +-- .../audit/AuditLogDirectParser.java | 5 +-- .../audit/AuditLogHiveTableParser.java | 4 +-- .../audit/AuditReplayCommand.java | 30 +++++++++++------ .../workloadgenerator/audit/AuditReplayMapper.java | 4 ++- .../audit/TestAuditLogDirectParser.java | 39 +++++++++++----------- 6 files changed, 51 insertions(+), 36 deletions(-) diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditCommandParser.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditCommandParser.java index c076a2061ef..ee8af3e0fdc 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditCommandParser.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditCommandParser.java @@ -32,7 +32,7 @@ public interface AuditCommandParser { /** * Initialize this parser with the given configuration. Guaranteed to be - * called prior to any calls to {@link #parse(Text, Function)}. + * called prior to any calls to {@link #parse(Long, Text, Function)}. * * @param conf The Configuration to be used to set up this parser. * @throws IOException if error on initializing a parser. @@ -46,6 +46,7 @@ public interface AuditCommandParser { * between the start of the audit log and this command) into absolute * timestamps. * + * @param sequence Sequence order of input line. * @param inputLine Single input line to convert. * @param relativeToAbsolute Function converting relative timestamps * (in milliseconds) to absolute timestamps @@ -53,7 +54,7 @@ public interface AuditCommandParser { * @return A command representing the input line. * @throws IOException if error on parsing. */ - AuditReplayCommand parse(Text inputLine, + AuditReplayCommand parse(Long sequence, Text inputLine, Function<Long, Long> relativeToAbsolute) throws IOException; } diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java index e649b69be73..3eb6d6766fe 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogDirectParser.java @@ -112,7 +112,7 @@ public void initialize(Configuration conf) throws IOException { } @Override - public AuditReplayCommand parse(Text inputLine, + public AuditReplayCommand parse(Long sequence, Text inputLine, Function<Long, Long> relativeToAbsolute) throws IOException { Matcher m = logLineParseRegex.matcher(inputLine.toString()); if (!m.find()) { @@ -147,7 +147,8 @@ public AuditReplayCommand parse(Text inputLine, } } - return new AuditReplayCommand(relativeToAbsolute.apply(relativeTimestamp), + return new AuditReplayCommand(sequence, + relativeToAbsolute.apply(relativeTimestamp), // Split the UGI on space to remove the auth and proxy portions of it SPACE_SPLITTER.split(parameterMap.get("ugi")).iterator().next(), parameterMap.get("cmd").replace("(options:", "(options="), diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java index abc10f4e33f..23224b6414c 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditLogHiveTableParser.java @@ -60,12 +60,12 @@ public void initialize(Configuration conf) throws IOException { } @Override - public AuditReplayCommand parse(Text inputLine, + public AuditReplayCommand parse(Long sequence, Text inputLine, Function<Long, Long> relativeToAbsolute) throws IOException { String[] fields = inputLine.toString().split(FIELD_SEPARATOR); long absoluteTimestamp = relativeToAbsolute .apply(Long.parseLong(fields[0])); - return new AuditReplayCommand(absoluteTimestamp, fields[1], fields[2], + return new AuditReplayCommand(sequence, absoluteTimestamp, fields[1], fields[2], fields[3], fields[4], fields[5]); } diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayCommand.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayCommand.java index 038312d2e9f..9576245396a 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayCommand.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayCommand.java @@ -43,6 +43,7 @@ class AuditReplayCommand implements Delayed { private static final Pattern SIMPLE_UGI_PATTERN = Pattern .compile("([^/@ ]*).*?"); + private Long sequence; private long absoluteTimestamp; private String ugi; private String command; @@ -50,8 +51,9 @@ class AuditReplayCommand implements Delayed { private String dest; private String sourceIP; - AuditReplayCommand(long absoluteTimestamp, String ugi, String command, + AuditReplayCommand(Long sequence, long absoluteTimestamp, String ugi, String command, String src, String dest, String sourceIP) { + this.sequence = sequence; this.absoluteTimestamp = absoluteTimestamp; this.ugi = ugi; this.command = command; @@ -60,6 +62,9 @@ class AuditReplayCommand implements Delayed { this.sourceIP = sourceIP; } + Long getSequence() { + return sequence; + } long getAbsoluteTimestamp() { return absoluteTimestamp; } @@ -103,8 +108,12 @@ public long getDelay(TimeUnit unit) { @Override public int compareTo(Delayed o) { - return Long.compare(absoluteTimestamp, - ((AuditReplayCommand) o).absoluteTimestamp); + int result = Long.compare(absoluteTimestamp, + ((AuditReplayCommand) o).absoluteTimestamp); + if (result != 0) { + return result; + } + return Long.compare(sequence, ((AuditReplayCommand) o).sequence); } /** @@ -122,9 +131,10 @@ boolean isPoison() { * information besides a timestamp; other getter methods wil return null. */ private static final class PoisonPillCommand extends AuditReplayCommand { + private static final Long DEFAULT_SEQUENCE = -1L; private PoisonPillCommand(long absoluteTimestamp) { - super(absoluteTimestamp, null, null, null, null, null); + super(DEFAULT_SEQUENCE, absoluteTimestamp, null, null, null, null, null); } @Override @@ -144,9 +154,9 @@ public boolean equals(Object other) { return false; } AuditReplayCommand o = (AuditReplayCommand) other; - return absoluteTimestamp == o.absoluteTimestamp && ugi.equals(o.ugi) - && command.equals(o.command) && src.equals(o.src) && dest.equals(o.dest) - && sourceIP.equals(o.sourceIP); + return sequence.equals(o.sequence) && absoluteTimestamp == o.absoluteTimestamp + && ugi.equals(o.ugi) && command.equals(o.command) && src.equals(o.src) + && dest.equals(o.dest) && sourceIP.equals(o.sourceIP); } @Override @@ -156,8 +166,8 @@ public int hashCode() { @Override public String toString() { - return String.format("AuditReplayCommand(absoluteTimestamp=%d, ugi=%s, " - + "command=%s, src=%s, dest=%s, sourceIP=%s", - absoluteTimestamp, ugi, command, src, dest, sourceIP); + return String.format("AuditReplayCommand(sequence=%d, absoluteTimestamp=%d, " + + "ugi=%s, command=%s, src=%s, dest=%s, sourceIP=%s", + sequence, absoluteTimestamp, ugi, command, src, dest, sourceIP); } } diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java index fdd8f20d85b..4524e7a2bb0 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/main/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/AuditReplayMapper.java @@ -169,6 +169,7 @@ public enum CommandType { private int numThreads; private double rateFactor; private long highestTimestamp; + private Long highestSequence; private List<AuditReplayThread> threads; private DelayQueue<AuditReplayCommand> commandQueue; private Function<Long, Long> relativeToAbsoluteTimestamp; @@ -246,7 +247,7 @@ public void setup(final Mapper.Context context) throws IOException { @Override public void map(LongWritable lineNum, Text inputLine, Mapper.Context context) throws IOException, InterruptedException { - AuditReplayCommand cmd = commandParser.parse(inputLine, + AuditReplayCommand cmd = commandParser.parse(lineNum.get(), inputLine, relativeToAbsoluteTimestamp); long delay = cmd.getDelay(TimeUnit.MILLISECONDS); // Prevent from loading too many elements into memory all at once @@ -255,6 +256,7 @@ public void map(LongWritable lineNum, Text inputLine, Mapper.Context context) } commandQueue.put(cmd); highestTimestamp = cmd.getAbsoluteTimestamp(); + highestSequence = cmd.getSequence(); } @Override diff --git a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java index a98b5c47164..007445e10fb 100644 --- a/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java +++ b/hadoop-tools/hadoop-dynamometer/hadoop-dynamometer-workload/src/test/java/org/apache/hadoop/tools/dynamometer/workloadgenerator/audit/TestAuditLogDirectParser.java @@ -30,6 +30,7 @@ public class TestAuditLogDirectParser { private static final long START_TIMESTAMP = 10000; private AuditLogDirectParser parser; + private Long sequence = 1L; @Before public void setup() throws Exception { @@ -53,55 +54,55 @@ private Text getAuditString(String timestamp, String ugi, String cmd, public void testSimpleInput() throws Exception { Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser", "listStatus", "sourcePath", "null"); - AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser", + AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser", "listStatus", "sourcePath", "null", "0.0.0.0"); - assertEquals(expected, parser.parse(in, Function.identity())); + assertEquals(expected, parser.parse(sequence, in, Function.identity())); } @Test public void testInputWithEquals() throws Exception { Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser", "listStatus", "day=1970", "null"); - AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser", + AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser", "listStatus", "day=1970", "null", "0.0.0.0"); - assertEquals(expected, parser.parse(in, Function.identity())); + assertEquals(expected, parser.parse(sequence, in, Function.identity())); } @Test public void testInputWithRenameOptions() throws Exception { Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser", "rename (options=[TO_TRASH])", "sourcePath", "destPath"); - AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser", + AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser", "rename (options=[TO_TRASH])", "sourcePath", "destPath", "0.0.0.0"); - assertEquals(expected, parser.parse(in, Function.identity())); + assertEquals(expected, parser.parse(sequence, in, Function.identity())); } @Test public void testInputWithTokenAuth() throws Exception { Text in = getAuditString("1970-01-01 00:00:11,000", "fakeUser (auth:TOKEN)", "create", "sourcePath", "null"); - AuditReplayCommand expected = new AuditReplayCommand(1000, "fakeUser", + AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, "fakeUser", "create", "sourcePath", "null", "0.0.0.0"); - assertEquals(expected, parser.parse(in, Function.identity())); + assertEquals(expected, parser.parse(sequence, in, Function.identity())); } @Test public void testInputWithProxyUser() throws Exception { Text in = getAuditString("1970-01-01 00:00:11,000", "proxyUser (auth:TOKEN) via fakeUser", "create", "sourcePath", "null"); - AuditReplayCommand expected = new AuditReplayCommand(1000, "proxyUser", - "create", "sourcePath", "null", "0.0.0.0"); - assertEquals(expected, parser.parse(in, Function.identity())); + AuditReplayCommand expected = new AuditReplayCommand(sequence, 1000, + "proxyUser", "create", "sourcePath", "null", "0.0.0.0"); + assertEquals(expected, parser.parse(sequence, in, Function.identity())); } @Test public void testParseDefaultDateFormat() throws Exception { Text in = getAuditString("1970-01-01 13:00:00,000", "ignored", "ignored", "ignored", "ignored"); - AuditReplayCommand expected = new AuditReplayCommand( + AuditReplayCommand expected = new AuditReplayCommand(sequence, 13 * 60 * 60 * 1000 - START_TIMESTAMP, "ignored", "ignored", "ignored", "ignored", "0.0.0.0"); - assertEquals(expected, parser.parse(in, Function.identity())); + assertEquals(expected, parser.parse(sequence, in, Function.identity())); } @Test @@ -114,9 +115,9 @@ public void testParseCustomDateFormat() throws Exception { parser.initialize(conf); Text in = getAuditString("1970-01-01 01:00:00,000 PM", "ignored", "ignored", "ignored", "ignored"); - AuditReplayCommand expected = new AuditReplayCommand(13 * 60 * 60 * 1000, + AuditReplayCommand expected = new AuditReplayCommand(sequence, 13 * 60 * 60 * 1000, "ignored", "ignored", "ignored", "ignored", "0.0.0.0"); - assertEquals(expected, parser.parse(in, Function.identity())); + assertEquals(expected, parser.parse(sequence, in, Function.identity())); } @Test @@ -128,9 +129,9 @@ public void testParseCustomTimeZone() throws Exception { parser.initialize(conf); Text in = getAuditString("1970-01-01 01:00:00,000", "ignored", "ignored", "ignored", "ignored"); - AuditReplayCommand expected = new AuditReplayCommand(0, + AuditReplayCommand expected = new AuditReplayCommand(sequence, 0, "ignored", "ignored", "ignored", "ignored", "0.0.0.0"); - assertEquals(expected, parser.parse(in, Function.identity())); + assertEquals(expected, parser.parse(sequence, in, Function.identity())); } @Test @@ -144,9 +145,9 @@ public void testParseCustomAuditLineFormat() throws Exception { conf.set(AuditLogDirectParser.AUDIT_LOG_PARSE_REGEX_KEY, "CUSTOM FORMAT \\((?<timestamp>.+?)\\) (?<message>.+)"); parser.initialize(conf); - AuditReplayCommand expected = new AuditReplayCommand(0, + AuditReplayCommand expected = new AuditReplayCommand(sequence, 0, "fakeUser", "fakeCommand", "src", "null", "0.0.0.0"); - assertEquals(expected, parser.parse(auditLine, Function.identity())); + assertEquals(expected, parser.parse(sequence, auditLine, Function.identity())); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org