This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 647e13eae07 [enhancement](image): load image supports reading in
batches (#50183)
647e13eae07 is described below
commit 647e13eae07bde9d42cb1d4f744351640485b1c8
Author: htyoung <[email protected]>
AuthorDate: Sat Apr 26 14:55:36 2025 +0800
[enhancement](image): load image supports reading in batches (#50183)
Text.readString supports reading in batches to avoid the situation where
the size of delete info in image file becomes too large due to a long
period without checkpointing, which could cause the method
CharBuffer.allocate size to exceed the maximum value of
Integer.MAX_VALUE and result in an overflow.
---
.../main/java/org/apache/doris/common/Config.java | 13 +++
.../main/java/org/apache/doris/common/io/Text.java | 109 ++++++++++++---------
.../java/org/apache/doris/common/io/TextTest.java | 65 ++++++++++++
3 files changed, 143 insertions(+), 44 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 082bbf95921..15d31a50935 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2883,6 +2883,19 @@ public class Config extends ConfigBase {
})
public static int sync_image_timeout_second = 300;
+ @ConfField(mutable = true, description = {
+ "FE启动时加载image文件某个模块的二进制内容到字节数组,并将字节数组反序列化为utf8编码字符串时单批次(单位:byte,
至少500MB)"
+ + "的大小。等于-1的值表示一次性读取完整的字节数组后反序列化反序列化为utf8编码字符串;"
+ +
"不等于-1的值(至少16MB)表示分批每次读取多大的字节数组后反序列化为utf8编码字符串,最后合并成完成的字符串。默认值为-1",
+ "The size of a single batch (in bytes) when loading the binary content
of a module of the "
+ + "image file into a byte array and deserializing the byte array
into a utf8 encoded string when FE starts."
+ + " A value equal to -1 means reading the entire byte array at
once and "
+ + "then deserializing it into a utf8 encoded string; a value not
equal to -1 means reading "
+ + "a certain size (at least 16MB) of byte array in batches and
then deserializing it into a "
+ + "utf8 encoded string, and finally merging it into a completed
string. The default value is -1"
+ })
+ public static int metadata_text_read_max_batch_bytes = -1;
+
@ConfField(mutable = true, masterOnly = true)
public static int publish_topic_info_interval_ms = 30000; // 30s
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java
b/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java
index 76256f30feb..50efafc55ab 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/io/Text.java
@@ -17,6 +17,8 @@
package org.apache.doris.common.io;
+import org.apache.doris.common.Config;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,11 +28,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.MalformedInputException;
+import java.nio.charset.StandardCharsets;
import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
@@ -48,17 +50,17 @@ import java.text.StringCharacterIterator;
public class Text implements Writable {
private static final Logger LOG = LoggerFactory.getLogger(Text.class);
- private static ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new
ThreadLocal<CharsetEncoder>() {
+ private static final ThreadLocal<CharsetEncoder> ENCODER_FACTORY = new
ThreadLocal<CharsetEncoder>() {
protected CharsetEncoder initialValue() {
- return Charset.forName("UTF-8").newEncoder()
+ return StandardCharsets.UTF_8.newEncoder()
.onMalformedInput(CodingErrorAction.REPORT)
.onUnmappableCharacter(CodingErrorAction.REPORT);
}
};
- private static ThreadLocal<CharsetDecoder> DECODER_FACTORY = new
ThreadLocal<CharsetDecoder>() {
+ private static final ThreadLocal<CharsetDecoder> DECODER_FACTORY = new
ThreadLocal<CharsetDecoder>() {
protected CharsetDecoder initialValue() {
- return Charset.forName("UTF-8").newDecoder()
+ return StandardCharsets.UTF_8.newDecoder()
.onMalformedInput(CodingErrorAction.REPORT)
.onUnmappableCharacter(CodingErrorAction.REPORT);
}
@@ -85,7 +87,6 @@ public class Text implements Writable {
set(utf8);
}
-
// Returns the raw bytes; however, only data up to getLength() is valid.
public byte[] getBytes() {
return bytes;
@@ -205,12 +206,9 @@ public class Text implements Writable {
/**
* Set the Text to range of bytes
*
- * @param utf8
- * the data to copy from
- * @param start
- * the first position of the new string
- * @param len
- * the number of bytes of the new string
+ * @param utf8 the data to copy from
+ * @param start the first position of the new string
+ * @param len the number of bytes of the new string
*/
public void set(byte[] utf8, int start, int len) {
setCapacity(len, false);
@@ -221,12 +219,9 @@ public class Text implements Writable {
/**
* Append a range of bytes to the end of the given text
*
- * @param utf8
- * the data to copy from
- * @param start
- * the first position to append from utf8
- * @param len
- * the number of bytes to append
+ * @param utf8 the data to copy from
+ * @param start the first position to append from utf8
+ * @param len the number of bytes to append
*/
public void append(byte[] utf8, int start, int len) {
setCapacity(length + len, true);
@@ -238,12 +233,9 @@ public class Text implements Writable {
* Append a range of bytes to the end of the given text, and adjust
* underlying buffer to reduce mem copy times
*
- * @param utf8
- * the data to copy from
- * @param start
- * the first position to append from utf8
- * @param len
- * the number of bytes to append
+ * @param utf8 the data to copy from
+ * @param start the first position to append from utf8
+ * @param len the number of bytes to append
*/
public void appendAdjust(byte[] utf8, int start, int len) {
int newLen = length + len;
@@ -413,8 +405,44 @@ public class Text implements Writable {
int length = in.readInt();
byte[] bytes = new byte[length];
in.readFully(bytes, 0, length);
- String res = decode(bytes);
- return res;
+ if (Config.metadata_text_read_max_batch_bytes == -1) {
+ return decode(bytes);
+ } else {
+ // if the Config.metadata_image_module_load_batch_size != -1 will
read bytes array and
+ // deserialize utf8 encode string in batch
+ int batchSize =
Math.max(Config.metadata_text_read_max_batch_bytes, 16 * 1024 * 1024);
+ int offset = 0;
+ StringBuilder sb = new StringBuilder();
+ while (offset < length) {
+ int chunkSize = Math.min(batchSize, length - offset);
+ // the last chunkSize should not adjust the safe cut position
+ if (offset + chunkSize < length) {
+ // find the safe cut position in utf8 encoded bytes
+ chunkSize = findSafeCutPosition(bytes, offset, chunkSize);
+ }
+ sb.append(decode(bytes, offset, chunkSize));
+ offset += chunkSize;
+ }
+ return sb.toString();
+ }
+ }
+
+ private static int findSafeCutPosition(byte[] bytes, int start, int
length) {
+ int end = start + length;
+ // Traverse backwards to find the last complete UTF-8 character
+ while (end > start) {
+ byte b = bytes[end - 1];
+ // Check if the byte is a continuation byte (10xxxxxx)
+ if ((b & 0xC0) == 0x80) {
+ // If it is a continuation byte, move to the previous byte
+ end--;
+ } else {
+ // If it is not a continuation byte, it is the start of a
character
+ break;
+ }
+ }
+ // The safe length is the difference between the end - 1 position and
the start position
+ return end - 1 - start;
}
/**
@@ -439,10 +467,8 @@ public class Text implements Writable {
/**
* Check if a byte array contains valid utf-8
*
- * @param utf8
- * byte array
- * @throws MalformedInputException
- * if the byte array contains invalid utf-8
+ * @param utf8 byte array
+ * @throws MalformedInputException if the byte array contains invalid utf-8
*/
public static void validateUTF8(byte[] utf8) throws
MalformedInputException {
validateUTF8(utf8, 0, utf8.length);
@@ -451,14 +477,10 @@ public class Text implements Writable {
/**
* Check to see if a byte array is valid utf-8
*
- * @param utf8
- * the array of bytes
- * @param start
- * the offset of the first byte in the array
- * @param len
- * the length of the byte sequence
- * @throws MalformedInputException
- * if the byte array contains invalid bytes
+ * @param utf8 the array of bytes
+ * @param start the offset of the first byte in the array
+ * @param len the length of the byte sequence
+ * @throws MalformedInputException if the byte array contains invalid bytes
*/
public static void validateUTF8(byte[] utf8, int start, int len)
throws MalformedInputException {
@@ -540,7 +562,7 @@ public class Text implements Writable {
* values 4 and 5 are presented in this table, even though valid UTF-8
* cannot include the five and six byte sequences.
*/
- static final int[] bytesFromUTF8 = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ static final int[] bytesFromUTF8 = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
@@ -559,7 +581,7 @@ public class Text implements Writable {
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3,
- 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5 };
+ 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5};
/**
* Returns the next code point at the current position in the buffer. The
@@ -606,15 +628,14 @@ public class Text implements Writable {
return ch;
}
- static final int[] offsetsFromUTF8 = { 0x00000000, 0x00003080, 0x000E2080,
- 0x03C82080, 0xFA082080, 0x82082080 };
+ static final int[] offsetsFromUTF8 = {0x00000000, 0x00003080, 0x000E2080,
+ 0x03C82080, 0xFA082080, 0x82082080};
/**
* For the given string, returns the number of UTF-8 bytes required to
* encode the string.
*
- * @param string
- * text to encode
+ * @param string text to encode
* @return number of UTF-8 bytes required to encode
*/
public static int utf8Length(String string) {
diff --git
a/fe/fe-common/src/test/java/org/apache/doris/common/io/TextTest.java
b/fe/fe-common/src/test/java/org/apache/doris/common/io/TextTest.java
new file mode 100644
index 00000000000..9a3adaab945
--- /dev/null
+++ b/fe/fe-common/src/test/java/org/apache/doris/common/io/TextTest.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.io;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+
+public class TextTest {
+ static Stream<Arguments> provideTestData() {
+ return Stream.of(
+ Arguments.arguments("Hello".getBytes(StandardCharsets.UTF_8)),
+ Arguments.arguments("helloé".getBytes(StandardCharsets.UTF_8)),
+
Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 1024 *
1024 * 16)),
+
Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 1024 *
1024 * 16 + 1)),
+
Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8), 1024 *
1024 * 32)),
+
Arguments.arguments(createBytes("中".getBytes(StandardCharsets.UTF_8),
43214321)),
+
Arguments.arguments("特殊\n\r\t字符".getBytes(StandardCharsets.UTF_8))
+ );
+ }
+
+ @ParameterizedTest(name = "[{index}] {arguments}")
+ @MethodSource("provideTestData")
+ void testReadString(byte[] inputBytes) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dataOutput = new DataOutputStream(bos);
+ dataOutput.writeInt(inputBytes.length);
+ dataOutput.write(inputBytes);
+ String result = Text.readString(new DataInputStream(new
ByteArrayInputStream(bos.toByteArray())));
+ Assertions.assertEquals(new String(inputBytes,
StandardCharsets.UTF_8), result);
+ }
+
+ private static byte[] createBytes(byte[] metaBytes, int scala) {
+ byte[] bytes = new byte[metaBytes.length * scala];
+ for (int i = 0; i < scala; i++) {
+ System.arraycopy(metaBytes, 0, bytes, i * metaBytes.length,
metaBytes.length);
+ }
+ return bytes;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]