This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this
push:
new 5f1c8433ab PHOENIX-7669 Provide more control over log file validation
(#2231)
5f1c8433ab is described below
commit 5f1c8433abb4d3df8fcf763b8bfd2c0ae5f4649f
Author: Andrew Purtell <[email protected]>
AuthorDate: Mon Jul 21 09:42:21 2025 -0700
PHOENIX-7669 Provide more control over log file validation (#2231)
---
.../replication/log/InvalidLogHeaderException.java | 31 ++++++++
.../log/InvalidLogTrailerException.java | 30 ++++++++
.../apache/phoenix/replication/log/LogFile.java | 25 +++++--
.../replication/log/LogFileFormatReader.java | 19 +++--
.../replication/log/LogFileFormatWriter.java | 30 ++------
.../phoenix/replication/log/LogFileHeader.java | 51 ++++---------
.../phoenix/replication/log/LogFileReader.java | 3 +-
.../replication/log/LogFileReaderContext.java | 21 +++++-
.../phoenix/replication/log/LogFileTrailer.java | 57 +++++---------
.../phoenix/replication/log/LogFileFormatTest.java | 87 ++++++++++++++++++++++
.../phoenix/replication/log/LogFileWriterTest.java | 10 +++
.../reader/ReplicationLogProcessorTest.java | 5 +-
12 files changed, 250 insertions(+), 119 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
new file mode 100644
index 0000000000..1b2284732d
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogHeaderException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.log;
+
+import java.io.IOException;
+
+/** Exception thrown when a log file has an invalid header. */
+public class InvalidLogHeaderException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidLogHeaderException(String message) {
+ super(message);
+ }
+
+}
+
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
new file mode 100644
index 0000000000..61cfbd3d38
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/InvalidLogTrailerException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.log;
+
+import java.io.IOException;
+
+/** Exception thrown when a log file has an invalid trailer. */
+public class InvalidLogTrailerException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidLogTrailerException(String message) {
+ super(message);
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
index 419313bd7e..f84d316234 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFile.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -417,21 +418,31 @@ public interface LogFile {
/**
* Utility for determining if a file is a valid replication log file.
+ * @param conf The Configuration
* @param fs The FileSystem
* @param path Path to the potential replication log file
+ * @param validateTrailer Whether to validate the trailer
* @return true if the file is a valid replication log file, false
otherwise
* @throws IOException if an I/O problem was encountered
*/
- static boolean isValidLogFile(final FileSystem fs, final Path path) throws
IOException {
+ static boolean isValidLogFile(final Configuration conf, final FileSystem
fs, final Path path,
+ final boolean validateTrailer) throws IOException {
long length = fs.getFileStatus(path).getLen();
try (FSDataInputStream in = fs.open(path)) {
- if (LogFileTrailer.isValidTrailer(in, length)) {
- return true;
- } else {
- // Not a valid trailer, do we need to do something (set a
flag)?
- // Fall back to checking the header.
- return LogFileHeader.isValidHeader(in);
+ // Check if the file is too short to be a valid log file.
+ if (length < LogFileHeader.HEADERSIZE) {
+ return false;
}
+ try (LogFileFormatReader reader = new LogFileFormatReader()) {
+ LogFileReaderContext context = new LogFileReaderContext(conf)
+ .setFilePath(path)
+ .setFileSize(length)
+ .setValidateTrailer(validateTrailer);
+ reader.init(context, (SeekableDataInput) in);
+ } catch (InvalidLogHeaderException | InvalidLogTrailerException e)
{
+ return false;
+ }
+ return true;
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
index 194abe7f87..9d0f17a2f2 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatReader.java
@@ -48,7 +48,6 @@ public class LogFileFormatReader implements Closeable {
private ByteBuffer currentBlockBuffer;
private long currentBlockDataBytes;
private long currentBlockConsumedBytes;
- private boolean trailerValidated;
private CRC64 crc = new CRC64();
public LogFileFormatReader() {
@@ -62,12 +61,14 @@ public class LogFileFormatReader implements Closeable {
this.currentBlockConsumedBytes = 0;
try {
readAndValidateTrailer();
- trailerValidated = true;
} catch (IOException e) {
+ // If we are validating the trailer, we cannot proceed without it.
+ if (context.isValidateTrailer()) {
+ throw e;
+ }
// Log warning, trailer might be missing or corrupt, proceed
without it
- LOG.warn("Failed to read or validate Log trailer for path: "
- + (context != null ? context.getFilePath() : "unknown")
- + ". Proceeding without trailer.", e);
+ LOG.warn("Failed to validate Log trailer for " +
context.getFilePath()
+ + ", proceeding", e);
trailer = null; // Ensure trailer is null if reading/validation
failed
}
this.decoder = null;
@@ -78,8 +79,7 @@ public class LogFileFormatReader implements Closeable {
private void readAndValidateTrailer() throws IOException {
if (context.getFileSize() < LogFileTrailer.FIXED_TRAILER_SIZE) {
- throw new IOException("File size " + context.getFileSize()
- + " is smaller than the fixed trailer size " +
LogFileTrailer.FIXED_TRAILER_SIZE);
+ throw new InvalidLogTrailerException("Short file");
}
LogFileTrailer ourTrailer = new LogFileTrailer();
// Fixed trailer fields will be LogTrailer.FIXED_TRAILER_SIZE bytes
back from end of file.
@@ -343,7 +343,7 @@ public class LogFileFormatReader implements Closeable {
// Validates read counts against trailer counts if trailer was
successfully read
private void validateReadCounts() {
- if (!trailerValidated || trailer == null) {
+ if (trailer == null) {
return;
}
if (trailer.getBlockCount() != context.getBlocksRead()) {
@@ -374,8 +374,7 @@ public class LogFileFormatReader implements Closeable {
+ input + ", header=" + header + ", trailer=" + trailer + ",
currentPosition="
+ currentPosition + ", currentBlockBuffer=" + currentBlockBuffer
+ ", currentBlockUncompressedSize=" + currentBlockDataBytes
- + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes
- + ", trailerValidated=" + trailerValidated + "]";
+ + ", currentBlockConsumedBytes=" + currentBlockConsumedBytes + "]";
}
LogFile.Header getHeader() {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
index 7e7f3c1931..d2778478ea 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileFormatWriter.java
@@ -41,8 +41,6 @@ public class LogFileFormatWriter implements Closeable {
private SyncableDataOutput output;
private ByteArrayOutputStream currentBlockBytes;
private DataOutputStream blockDataStream;
- private boolean headerWritten = false;
- private boolean trailerWritten = false;
private long recordCount = 0;
private long blockCount = 0;
private long blocksStartOffset = -1;
@@ -60,15 +58,14 @@ public class LogFileFormatWriter implements Closeable {
this.currentBlockBytes = new ByteArrayOutputStream();
this.blockDataStream = new DataOutputStream(currentBlockBytes);
this.encoder = context.getCodec().getEncoder(blockDataStream);
+ // Write header immediately when file is created
+ writeFileHeader();
}
private void writeFileHeader() throws IOException {
- if (!headerWritten) {
- LogFileHeader header = new LogFileHeader();
- header.write(output);
- blocksStartOffset = output.getPos(); // First block starts after
header
- headerWritten = true;
- }
+ LogFileHeader header = new LogFileHeader();
+ header.write(output);
+ blocksStartOffset = output.getPos(); // First block starts after header
}
public long getBlocksStartOffset() {
@@ -76,13 +73,6 @@ public class LogFileFormatWriter implements Closeable {
}
public void append(LogFile.Record record) throws IOException {
- if (!headerWritten) {
- // Lazily write file header
- writeFileHeader();
- }
- if (trailerWritten) {
- throw new IOException("Cannot append record after trailer has been
written");
- }
if (blockDataStream == null) {
startBlock(); // Start the block if needed
}
@@ -188,15 +178,10 @@ public class LogFileFormatWriter implements Closeable {
@Override
public void close() throws IOException {
- // We use the fact we have already written the trailer as the boolean
"closed" condition.
- if (trailerWritten) {
+ if (output == null) {
return;
}
try {
- // We might be closing an empty file, handle this case correctly.
- if (!headerWritten) {
- writeFileHeader();
- }
// Close any outstanding block.
closeBlock();
// After we write the trailer we consider the file closed.
@@ -209,6 +194,7 @@ public class LogFileFormatWriter implements Closeable {
} catch (IOException e) {
LOG.error("Exception while closing LogFormatWriter", e);
}
+ output = null;
}
}
}
@@ -220,7 +206,6 @@ public class LogFileFormatWriter implements Closeable {
.setBlocksStartOffset(blocksStartOffset)
.setTrailerStartOffset(output.getPos());
trailer.write(output);
- trailerWritten = true;
try {
output.sync();
} catch (IOException e) {
@@ -233,7 +218,6 @@ public class LogFileFormatWriter implements Closeable {
public String toString() {
return "LogFileFormatWriter [writerContext=" + context
+ ", currentBlockUncompressedBytes=" + currentBlockBytes
- + ", headerWritten=" + headerWritten + ", trailerWritten=" +
trailerWritten
+ ", recordCount=" + recordCount + ", blockCount=" + blockCount
+ ", blocksStartOffset=" + blocksStartOffset + "]";
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
index bfc07b76d9..4fc85edabf 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileHeader.java
@@ -19,12 +19,10 @@ package org.apache.phoenix.replication.log;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
public class LogFileHeader implements LogFile.Header {
@@ -36,7 +34,7 @@ public class LogFileHeader implements LogFile.Header {
/** Current minor version of the replication log format */
static final int VERSION_MINOR = 0;
- static final int HEADERSIZE = MAGIC.length + 3 * Bytes.SIZEOF_BYTE;
+ static final int HEADERSIZE = MAGIC.length + 2 * Bytes.SIZEOF_BYTE;
private int majorVersion = VERSION_MAJOR;
private int minorVersion = VERSION_MINOR;
@@ -70,17 +68,25 @@ public class LogFileHeader implements LogFile.Header {
@Override
public void readFields(DataInput in) throws IOException {
byte[] magic = new byte[MAGIC.length];
- in.readFully(magic);
+ try {
+ in.readFully(magic);
+ } catch (EOFException e) {
+ throw (IOException) new InvalidLogHeaderException("Short
magic").initCause(e);
+ }
if (!Arrays.equals(MAGIC, magic)) {
- throw new IOException("Invalid LogFile magic. Got " +
Bytes.toStringBinary(magic)
+ throw new InvalidLogHeaderException("Bad magic. Got " +
Bytes.toStringBinary(magic)
+ ", expected " + Bytes.toStringBinary(MAGIC));
}
- majorVersion = in.readByte();
- minorVersion = in.readByte();
+ try {
+ majorVersion = in.readByte();
+ minorVersion = in.readByte();
+ } catch (EOFException e) {
+ throw (IOException) new InvalidLogHeaderException("Short
version").initCause(e);
+ }
// Basic version check for now. We assume semver conventions where
only higher major
// versions may be incompatible.
if (majorVersion > VERSION_MAJOR) {
- throw new IOException("Unsupported LogFile version. Got major=" +
majorVersion
+ throw new InvalidLogHeaderException("Unsupported version. Got
major=" + majorVersion
+ " minor=" + minorVersion + ", expected major=" +
VERSION_MAJOR
+ " minor=" + VERSION_MINOR);
}
@@ -98,33 +104,6 @@ public class LogFileHeader implements LogFile.Header {
return HEADERSIZE;
}
- public static boolean isValidHeader(final FileSystem fs, final Path path)
- throws IOException {
- if (fs.getFileStatus(path).getLen() < HEADERSIZE) {
- return false;
- }
- try (FSDataInputStream in = fs.open(path)) {
- return isValidHeader(in);
- }
- }
-
- public static boolean isValidHeader(FSDataInputStream in) throws
IOException {
- in.seek(0);
- byte[] magic = new byte[MAGIC.length];
- in.readFully(magic);
- if (!Arrays.equals(MAGIC, magic)) {
- return false;
- }
- int majorVersion = in.readByte();
- in.readByte(); // minorVersion, for now we don't use it
- // Basic version check for now. We assume semver conventions where
only higher major
- // versions may be incompatible.
- if (majorVersion > VERSION_MAJOR) {
- return false;
- }
- return true;
- }
-
@Override
public String toString() {
return "LogFileHeader [majorVersion=" + majorVersion + ",
minorVersion=" + minorVersion
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
index 6b3fa028b2..1961922b7a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReader.java
@@ -137,7 +137,8 @@ public class LogFileReader implements LogFile.Reader {
throw e;
} finally {
closed = true;
- LOG.debug("Closed LogFileReader for path {}",
context.getFilePath());
+ LOG.debug("Closed LogFileReader for path {}", context != null ?
context.getFilePath()
+ : "null");
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
index 52fbdb0cbb..a2c6de3488 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileReaderContext.java
@@ -36,12 +36,17 @@ public class LogFileReaderContext {
/** Default for skipping corrupt blocks */
public static final boolean DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS = true;
+ public static final String LOGFILE_VALIDATE_TRAILER =
+ "phoenix.replication.logfile.validate.trailer";
+ public static final boolean DEFAULT_LOGFILE_VALIDATE_TRAILER = true;
+
private final Configuration conf;
private FileSystem fs;
private Path path;
private LogFileCodec codec;
private long fileSize = -1;
private boolean isSkipCorruptBlocks;
+ private boolean isValidateTrailer;
private long blocksRead;
private long recordsRead;
private long corruptBlocksSkipped;
@@ -50,6 +55,8 @@ public class LogFileReaderContext {
this.conf = conf;
this.isSkipCorruptBlocks = conf.getBoolean(LOGFILE_SKIP_CORRUPT_BLOCKS,
DEFAULT_LOGFILE_SKIP_CORRUPT_BLOCKS);
+ this.isValidateTrailer = conf.getBoolean(LOGFILE_VALIDATE_TRAILER,
+ DEFAULT_LOGFILE_VALIDATE_TRAILER);
// Note: When we have multiple codec types, instantiate the
appropriate type based on
// configuration;
this.codec = new LogFileCodec();
@@ -146,11 +153,21 @@ public class LogFileReaderContext {
return this;
}
+ public boolean isValidateTrailer() {
+ return isValidateTrailer;
+ }
+
+ public LogFileReaderContext setValidateTrailer(boolean validateTrailer) {
+ this.isValidateTrailer = validateTrailer;
+ return this;
+ }
+
@Override
public String toString() {
return "LogFileReaderContext [filePath=" + path + ", fileSize=" +
fileSize
- + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ", codec=" +
codec + ", blocksRead="
- + blocksRead + ", recordsRead=" + recordsRead + ",
corruptBlocksSkipped="
+ + ", isSkipCorruptBlocks=" + isSkipCorruptBlocks + ",
isValidateTrailer="
+ + isValidateTrailer + ", codec=" + codec + ", blocksRead=" +
blocksRead
+ + ", recordsRead=" + recordsRead + ", corruptBlocksSkipped="
+ corruptBlocksSkipped + "]";
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
index b05da86aae..8e29bf288a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileTrailer.java
@@ -19,12 +19,10 @@ package org.apache.phoenix.replication.log;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
public class LogFileTrailer implements LogFile.Trailer {
@@ -113,23 +111,31 @@ public class LogFileTrailer implements LogFile.Trailer {
}
public void readFixedFields(DataInput in) throws IOException {
- this.recordCount = in.readLong();
- this.blockCount = in.readLong();
- this.blocksStartOffset = in.readLong();
- this.trailerStartOffset = in.readLong();
- this.majorVersion = in.readByte();
- this.minorVersion = in.readByte();
+ try {
+ this.recordCount = in.readLong();
+ this.blockCount = in.readLong();
+ this.blocksStartOffset = in.readLong();
+ this.trailerStartOffset = in.readLong();
+ this.majorVersion = in.readByte();
+ this.minorVersion = in.readByte();
+ } catch (EOFException e) {
+ throw (IOException) new InvalidLogTrailerException("Short fixed
fields").initCause(e);
+ }
// Basic version check for now. We assume semver conventions where
only higher major
// versions may be incompatible.
if (majorVersion > LogFileHeader.VERSION_MAJOR) {
- throw new IOException("Unsupported LogFile version. Got major=" +
majorVersion
+ throw new InvalidLogTrailerException("Unsupported version. Got
major=" + majorVersion
+ " minor=" + minorVersion + ", expected major=" +
LogFileHeader.VERSION_MAJOR
+ " minor=" + LogFileHeader.VERSION_MINOR);
}
byte[] magic = new byte[LogFileHeader.MAGIC.length];
- in.readFully(magic);
+ try {
+ in.readFully(magic);
+ } catch (EOFException e) {
+ throw (IOException) new InvalidLogTrailerException("Short
magic").initCause(e);
+ }
if (!Arrays.equals(LogFileHeader.MAGIC, magic)) {
- throw new IOException("Invalid LogFile magic. Got " +
Bytes.toStringBinary(magic)
+ throw new InvalidLogTrailerException("Bad magic. Got " +
Bytes.toStringBinary(magic)
+ ", expected " + Bytes.toStringBinary(LogFileHeader.MAGIC));
}
}
@@ -172,33 +178,6 @@ public class LogFileTrailer implements LogFile.Trailer {
+ FIXED_TRAILER_SIZE;
}
- public static boolean isValidTrailer(final FileSystem fs, final Path path)
throws IOException {
- try (FSDataInputStream in = fs.open(path)) {
- return isValidTrailer(in, fs.getFileStatus(path).getLen());
- }
- }
-
- public static boolean isValidTrailer(FSDataInputStream in, long length)
throws IOException {
- long offset = length - VERSION_AND_MAGIC_SIZE;
- if (offset < 0) {
- return false;
- }
- in.seek(offset);
- byte[] magic = new byte[LogFileHeader.MAGIC.length];
- in.readFully(magic);
- if (!Arrays.equals(LogFileHeader.MAGIC, magic)) {
- return false;
- }
- int majorVersion = in.readByte();
- in.readByte(); // minorVersion, for now we don't use it
- // Basic version check for now. We assume semver conventions where
only higher major
- // versions may be incompatible.
- if (majorVersion > LogFileHeader.VERSION_MAJOR) {
- return false;
- }
- return true;
- }
-
@Override
public String toString() {
return "LogFileTrailer [majorVersion=" + majorVersion + ",
minorVersion=" + minorVersion
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
index 875cb6ea1b..f623fb9f0a 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileFormatTest.java
@@ -296,6 +296,9 @@ public class LogFileFormatTest {
byte[] data = writerBaos.toByteArray();
byte[] truncatedData = Arrays.copyOf(data, (int) truncationPoint);
+ // We truncated the final block, so the trailer is gone too.
+ readerContext.setValidateTrailer(false);
+
initLogFileReader(truncatedData);
List<LogFile.Record> decoded = new ArrayList<>();
@@ -342,6 +345,7 @@ public class LogFileFormatTest {
LogFileTestUtil.SeekableByteArrayInputStream input =
new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData);
readerContext.setFileSize(truncatedData.length);
+ readerContext.setValidateTrailer(false);
// This init should log a warning but succeed
reader.init(readerContext, input);
@@ -377,6 +381,7 @@ public class LogFileFormatTest {
LogFileTestUtil.SeekableByteArrayInputStream input =
new LogFileTestUtil.SeekableByteArrayInputStream(truncatedData);
readerContext.setFileSize(truncatedData.length);
+ readerContext.setValidateTrailer(false);
// Init should log a warning but succeed by ignoring the trailer
reader.init(readerContext, input);
@@ -406,6 +411,88 @@ public class LogFileFormatTest {
assertEquals("Records read count mismatch", totalRecords,
readerContext.getRecordsRead());
}
+ @Test
+ public void testFailIfMissingHeader() throws IOException {
+ // Zero length file
+ byte[] data = new byte[0];
+ LogFileTestUtil.SeekableByteArrayInputStream input =
+ new LogFileTestUtil.SeekableByteArrayInputStream(data);
+ readerContext.setFileSize(data.length);
+ readerContext.setValidateTrailer(false);
+ try {
+ reader.init(readerContext, input);
+ fail("Expected InvalidLogHeaderException for zero length file");
+ } catch (InvalidLogHeaderException e) {
+ assertTrue("Exception message should contain 'Short magic'",
+ e.getMessage().contains("Short magic"));
+ }
+ }
+
+ @Test
+ public void testFailIfInvalidHeader() throws IOException {
+ initLogFileWriter();
+ writer.close(); // Writes valid trailer
+ byte[] data = writerBaos.toByteArray();
+ LogFileTestUtil.SeekableByteArrayInputStream input =
+ new LogFileTestUtil.SeekableByteArrayInputStream(data);
+ readerContext.setFileSize(data.length);
+ readerContext.setValidateTrailer(true);
+ data[0] = (byte) 'X'; // Corrupt the first magic byte
+ try {
+ reader.init(readerContext, input);
+ fail("Expected InvalidLogHeaderException for file with corrupted
header magic");
+ } catch (InvalidLogHeaderException e) {
+ assertTrue("Exception message should contain 'Bad magic'",
+ e.getMessage().contains("Bad magic"));
+ }
+ }
+
+ @Test
+ public void testFailIfMissingTrailer() throws IOException {
+ initLogFileWriter();
+ writeBlock(writer, "B1", 0, 5);
+ // Don't close the writer, simulate missing trailer
+ byte[] data = writerBaos.toByteArray();
+ // Re-initialize reader with truncated data and trailer validation
enabled
+ LogFileTestUtil.SeekableByteArrayInputStream input =
+ new LogFileTestUtil.SeekableByteArrayInputStream(data);
+ readerContext.setFileSize(data.length);
+ // Enable trailer validation
+ readerContext.setValidateTrailer(true);
+ try {
+ reader.init(readerContext, input);
+ fail("Expected InvalidLogTrailerException when trailer is
missing");
+ } catch (InvalidLogTrailerException e) {
+ assertTrue("Exception message should contain 'Unsupported
version'",
+ e.getMessage().contains("Unsupported version"));
+ }
+ }
+
+ @Test
+ public void testFailIfInvalidTrailer() throws IOException {
+ initLogFileWriter();
+ writeBlock(writer, "B1", 0, 5);
+ writer.close(); // Writes valid trailer
+ byte[] data = writerBaos.toByteArray();
+ // Corrupt the trailer by changing the magic bytes
+ int trailerStartOffset = data.length -
LogFileTrailer.FIXED_TRAILER_SIZE;
+ int magicOffset = trailerStartOffset +
LogFileTrailer.FIXED_TRAILER_SIZE
+ - LogFileHeader.MAGIC.length;
+ data[magicOffset] = (byte) 'X'; // Corrupt the first magic byte
+ // Re-initialize reader with corrupted trailer and trailer validation
enabled
+ LogFileTestUtil.SeekableByteArrayInputStream input =
+ new LogFileTestUtil.SeekableByteArrayInputStream(data);
+ readerContext.setFileSize(data.length);
+ readerContext.setValidateTrailer(true);
+ try {
+ reader.init(readerContext, input);
+ fail("Expected InvalidLogTrailerException when trailer magic is
corrupt");
+ } catch (InvalidLogTrailerException e) {
+ assertTrue("Exception message should contain 'Bad magic'",
+ e.getMessage().contains("Bad magic"));
+ }
+ }
+
@Test
public void testLogFileCorruptionFirstBlockChecksum() throws IOException {
initLogFileWriter();
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
index 55c4737223..b4e828a3b6 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/log/LogFileWriterTest.java
@@ -137,6 +137,16 @@ public class LogFileWriterTest {
reader.close();
}
+ @Test
+ public void testHeaderWrittenImmediately() throws IOException {
+ // This should write header immediately
+ initLogFileWriter();
+ // Verify file exists and has content (header should be written)
+ assertTrue("File should exist after init", localFs.exists(filePath));
+ assertEquals("File should have header written",
LogFileHeader.HEADERSIZE, writer.getLength());
+ writer.close();
+ }
+
private void initLogFileReader() throws IOException {
readerContext = new LogFileReaderContext(conf).setFileSystem(localFs)
.setFilePath(filePath);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
index e97cd027e9..2bbc18a35d 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
@@ -500,7 +500,10 @@ public class ReplicationLogProcessorTest extends
ParallelStatsDisabledIT {
writer.append(tableNameString, 1, put);
writer.sync();
- ReplicationLogProcessor spyProcessor = Mockito.spy(new
ReplicationLogProcessor(conf, testHAGroupName));
+ // For processing of an unclosed file to work, we need to disable
trailer validation
+ Configuration testConf = new Configuration(conf);
+ testConf.setBoolean(LogFileReaderContext.LOGFILE_VALIDATE_TRAILER,
false);
+ ReplicationLogProcessor spyProcessor = Mockito.spy(new
ReplicationLogProcessor(testConf, testHAGroupName));
// Create argument captor to capture the actual parameters passed to
processReplicationLogBatch
ArgumentCaptor<Map<TableName, List<Mutation>>> mapCaptor =