This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 97eb2b9172 [Fix](multi-catalog) Fix broker load reader and hdfs reader
issue. (#23529)
97eb2b9172 is described below
commit 97eb2b91726ca24622c20c20fb1ab5b9e1c85559
Author: Qi Chen <[email protected]>
AuthorDate: Tue Aug 29 13:45:48 2023 +0800
[Fix](multi-catalog) Fix broker load reader and hdfs reader issue. (#23529)
Broker load with broker sometimes will throw 'Invalid orc post script
length'.
hdfs query sometimes will throw 'Invalid orc post script length'.
---
be/src/io/fs/hdfs_file_reader.cpp | 20 +++++---
.../doris/broker/hdfs/FileSystemManager.java | 54 +++++++++-------------
2 files changed, 37 insertions(+), 37 deletions(-)
diff --git a/be/src/io/fs/hdfs_file_reader.cpp
b/be/src/io/fs/hdfs_file_reader.cpp
index 383958802f..6c4f456e37 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -133,13 +133,21 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
return Status::OK();
}
- tSize r = hdfsPread(_handle->fs(), _handle->file(), offset, to, bytes_req);
- if (r == -1) {
- return Status::InternalError(
- "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err:
{}",
- BackendOptions::get_localhost(), _name_node, _path.string(),
hdfs_error());
+ size_t has_read = 0;
+ while (has_read < bytes_req) {
+ tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset +
has_read,
+ to + has_read, bytes_req - has_read);
+ if (loop_read < 0) {
+ return Status::InternalError(
+ "Read hdfs file failed. (BE: {}) namenode:{}, path:{},
err: {}",
+ BackendOptions::get_localhost(), _name_node,
_path.string(), hdfs_error());
+ }
+ if (loop_read == 0) {
+ break;
+ }
+ has_read += loop_read;
}
- *bytes_read = bytes_req;
+ *bytes_read = has_read;
return Status::OK();
}
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index d86df86fe0..d25947e33b 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -1204,27 +1204,32 @@ public class FileSystemManager {
// Avoid using the ByteBuffer based read for Hadoop because some
FSDataInputStream
// implementations are not ByteBufferReadable,
// See https://issues.apache.org/jira/browse/HADOOP-14603
- byte[] buf;
- if (length > readBufferSize) {
- buf = new byte[readBufferSize];
- } else {
- buf = new byte[(int) length];
- }
- try {
- int readLength = readBytesFully(fsDataInputStream, buf);
- if (readLength < 0) {
- throw new
BrokerException(TBrokerOperationStatusCode.END_OF_FILE,
- "end of file reached");
- }
- if (logger.isDebugEnabled()) {
- logger.debug("read buffer from input stream, buffer size:"
+ buf.length + ", read length:" + readLength);
+ int hasRead = 0;
+ byte[] buf = new byte[(int)length];
+ while (hasRead < length) {
+ int bufSize = Math.min((int) length - hasRead, readBufferSize);
+ try {
+ int readLength = fsDataInputStream.read(buf, hasRead,
bufSize);
+ if (readLength < 0) {
+ throw new
BrokerException(TBrokerOperationStatusCode.END_OF_FILE,
+ "end of file reached");
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("read buffer from input stream, buffer
size:" + buf.length + ", read length:"
+ + readLength);
+ }
+ hasRead += readLength;
+ } catch (IOException e) {
+ logger.error("errors while read data from stream", e);
+ throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
+ e, "errors while read data from stream");
}
- return ByteBuffer.wrap(buf, 0, readLength);
- } catch (IOException e) {
- logger.error("errors while read data from stream", e);
+ }
+ if (hasRead != length) {
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
- e, "errors while write data to output stream");
+ String.format("errors while read data from stream:
hasRead(%d) != length(%d)", hasRead, length));
}
+ return ByteBuffer.wrap(buf, 0, hasRead);
}
}
@@ -1325,19 +1330,6 @@ public class FileSystemManager {
return new TBrokerFD(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
}
- private int readBytesFully(FSDataInputStream is, byte[] dest) throws
IOException {
- int readLength = 0;
- while (readLength < dest.length) {
- int availableReadLength = dest.length - readLength;
- int n = is.read(dest, readLength, availableReadLength);
- if (n <= 0) {
- break;
- }
- readLength += n;
- }
- return readLength;
- }
-
/**
* In view of the different expiration mechanisms of different
authentication modes,
* there are two ways to determine whether BrokerFileSystem has expired:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]