This is an automated email from the ASF dual-hosted git repository.
ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4c4f08f805 [fix](hudi) the required fields are empty if only reading
partition columns (#22187)
4c4f08f805 is described below
commit 4c4f08f8059b3c854669322f8777f294f5a89e93
Author: Ashin Gau <[email protected]>
AuthorDate: Wed Jul 26 10:59:45 2023 +0800
[fix](hudi) the required fields are empty if only reading partition columns
(#22187)
1. If only read the partition columns, the `JniConnector` will produce
empty required fields, so `HudiJniScanner` should read the "_hoodie_record_key"
field at least to know how many rows in current hoodie split. Even if the
`JniConnector` doesn't read this field, the call of `releaseTable` in
`JniConnector` will reclaim the resource.
2. To prevent BE failure and exit, `JniConnector` should call release
methods after `HudiJniScanner` is initialized. It should be noted that
`VectorTable` is created lazily in `JniScanner`, so we don't need to reclaim
the resource when `HudiJniScanner` is failed to initialize.
## Remaining works
Other jni readers like `paimon` and `maxcompute` may encounter the same
problems, the jni reader need to handle this abnormal situation on its own, and
currently this fix can only ensure that BE will not exit.
---
be/src/vec/exec/jni_connector.cpp | 61 ++++++++++++----------
be/src/vec/exec/jni_connector.h | 1 +
.../java/org/apache/doris/hudi/HudiJniScanner.java | 54 ++++++++++---------
.../org/apache/doris/hudi/BaseSplitReader.scala | 12 ++++-
4 files changed, 74 insertions(+), 54 deletions(-)
diff --git a/be/src/vec/exec/jni_connector.cpp
b/be/src/vec/exec/jni_connector.cpp
index edb195479a..ea7e9e82c4 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -156,37 +156,39 @@ Status JniConnector::close() {
if (!_closed) {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
- // update scanner metrics
- for (const auto& metric : get_statistics(env)) {
- std::vector<std::string> type_and_name = split(metric.first, ":");
- if (type_and_name.size() != 2) {
- LOG(WARNING) << "Name of JNI Scanner metric should be pattern
like "
- << "'metricType:metricName'";
- continue;
+ if (_scanner_initialized) {
+ // update scanner metrics
+ for (const auto& metric : get_statistics(env)) {
+ std::vector<std::string> type_and_name = split(metric.first,
":");
+ if (type_and_name.size() != 2) {
+ LOG(WARNING) << "Name of JNI Scanner metric should be
pattern like "
+ << "'metricType:metricName'";
+ continue;
+ }
+ long metric_value = std::stol(metric.second);
+ RuntimeProfile::Counter* scanner_counter;
+ if (type_and_name[0] == "timer") {
+ scanner_counter =
+ ADD_CHILD_TIMER(_profile, type_and_name[1],
_connector_name.c_str());
+ } else if (type_and_name[0] == "counter") {
+ scanner_counter = ADD_CHILD_COUNTER(_profile,
type_and_name[1], TUnit::UNIT,
+
_connector_name.c_str());
+ } else if (type_and_name[0] == "bytes") {
+ scanner_counter = ADD_CHILD_COUNTER(_profile,
type_and_name[1], TUnit::BYTES,
+
_connector_name.c_str());
+ } else {
+ LOG(WARNING) << "Type of JNI Scanner metric should be
timer, counter or bytes";
+ continue;
+ }
+ COUNTER_UPDATE(scanner_counter, metric_value);
}
- long metric_value = std::stol(metric.second);
- RuntimeProfile::Counter* scanner_counter;
- if (type_and_name[0] == "timer") {
- scanner_counter =
- ADD_CHILD_TIMER(_profile, type_and_name[1],
_connector_name.c_str());
- } else if (type_and_name[0] == "counter") {
- scanner_counter = ADD_CHILD_COUNTER(_profile,
type_and_name[1], TUnit::UNIT,
- _connector_name.c_str());
- } else if (type_and_name[0] == "bytes") {
- scanner_counter = ADD_CHILD_COUNTER(_profile,
type_and_name[1], TUnit::BYTES,
- _connector_name.c_str());
- } else {
- LOG(WARNING) << "Type of JNI Scanner metric should be timer,
counter or bytes";
- continue;
- }
- COUNTER_UPDATE(scanner_counter, metric_value);
- }
- // _fill_block may be failed and returned, we should release table in
close.
- // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent
- env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
- env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
- env->DeleteGlobalRef(_jni_scanner_obj);
+ // _fill_block may be failed and returned, we should release table
in close.
+ // org.apache.doris.common.jni.JniScanner#releaseTable is
idempotent
+ env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
+ env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
+ env->DeleteGlobalRef(_jni_scanner_obj);
+ }
env->DeleteGlobalRef(_jni_scanner_cls);
_closed = true;
jthrowable exc = (env)->ExceptionOccurred();
@@ -222,6 +224,7 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int
batch_size) {
_jni_scanner_get_statistics =
env->GetMethodID(_jni_scanner_cls, "getStatistics",
"()Ljava/util/Map;");
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_scanner_obj,
&_jni_scanner_obj));
+ _scanner_initialized = true;
env->DeleteLocalRef(jni_scanner_obj);
RETURN_ERROR_IF_EXC(env);
return Status::OK();
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index 0f08fbe0f8..1cadc37a1b 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -257,6 +257,7 @@ private:
size_t _has_read = 0;
bool _closed = false;
+ bool _scanner_initialized = false;
jclass _jni_scanner_cls;
jobject _jni_scanner_obj;
jmethodID _jni_scanner_open;
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index d067493a79..539ab8f7a8 100644
---
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -47,6 +47,7 @@ public class HudiJniScanner extends JniScanner {
private static final Logger LOG = Logger.getLogger(HudiJniScanner.class);
private final int fetchSize;
+ private final String debugString;
private final HoodieSplit split;
private final ScanPredicate[] predicates;
private final ClassLoader classLoader;
@@ -56,26 +57,29 @@ public class HudiJniScanner extends JniScanner {
private Iterator<InternalRow> recordIterator;
public HudiJniScanner(int fetchSize, Map<String, String> params) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hudi JNI params:\n" + params.entrySet().stream().map(kv
-> kv.getKey() + "=" + kv.getValue())
- .collect(Collectors.joining("\n")));
- }
- this.classLoader = this.getClass().getClassLoader();
- String predicatesAddressString = params.remove("push_down_predicates");
- this.fetchSize = fetchSize;
- this.split = new HoodieSplit(params);
- if (predicatesAddressString == null) {
- predicates = new ScanPredicate[0];
- } else {
- long predicatesAddress = Long.parseLong(predicatesAddressString);
- if (predicatesAddress != 0) {
- predicates =
ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes());
- LOG.info("HudiJniScanner gets pushed-down predicates: " +
ScanPredicate.dump(predicates));
- } else {
+ debugString = params.entrySet().stream().map(kv -> kv.getKey() + "=" +
kv.getValue())
+ .collect(Collectors.joining("\n"));
+ try {
+ this.classLoader = this.getClass().getClassLoader();
+ String predicatesAddressString =
params.remove("push_down_predicates");
+ this.fetchSize = fetchSize;
+ this.split = new HoodieSplit(params);
+ if (predicatesAddressString == null) {
predicates = new ScanPredicate[0];
+ } else {
+ long predicatesAddress =
Long.parseLong(predicatesAddressString);
+ if (predicatesAddress != 0) {
+ predicates =
ScanPredicate.parseScanPredicates(predicatesAddress, split.requiredTypes());
+ LOG.info("HudiJniScanner gets pushed-down predicates: " +
ScanPredicate.dump(predicates));
+ } else {
+ predicates = new ScanPredicate[0];
+ }
}
+ ugi = Utils.getUserGroupInformation(split.hadoopConf());
+ } catch (Exception e) {
+ LOG.error("Failed to initialize hudi scanner, split params:\n" +
debugString, e);
+ throw e;
}
- ugi = Utils.getUserGroupInformation(split.hadoopConf());
}
@Override
@@ -104,17 +108,18 @@ public class HudiJniScanner extends JniScanner {
}
}
}, 100, 1000, TimeUnit.MILLISECONDS);
- if (ugi != null) {
- try {
+ try {
+ if (ugi != null) {
recordIterator = ugi.doAs(
(PrivilegedExceptionAction<Iterator<InternalRow>>) ()
-> new MORSnapshotSplitReader(
split).buildScanIterator(split.requiredFields(), new Filter[0]));
- } catch (InterruptedException e) {
- throw new IOException(e);
+ } else {
+ recordIterator = new MORSnapshotSplitReader(split)
+ .buildScanIterator(split.requiredFields(), new
Filter[0]);
}
- } else {
- recordIterator = new MORSnapshotSplitReader(split)
- .buildScanIterator(split.requiredFields(), new Filter[0]);
+ } catch (Exception e) {
+ LOG.error("Failed to open hudi scanner, split params:\n" +
debugString, e);
+ throw new IOException(e.getMessage(), e);
}
isKilled.set(true);
executorService.shutdownNow();
@@ -146,6 +151,7 @@ public class HudiJniScanner extends JniScanner {
return readRowNumbers;
} catch (Exception e) {
close();
+ LOG.error("Failed to get the next batch of hudi, split params:\n"
+ debugString, e);
throw new IOException("Failed to get the next batch of hudi.", e);
}
}
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
index 5eceb1c9b3..cdae395534 100644
---
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -84,7 +84,17 @@ class HoodieSplit(private val params: jutil.Map[String,
String]) {
val hudiColumnTypes: Map[String, String] = hudiColumnNames.zip(
params.remove("hudi_column_types").split("#")).toMap
- val requiredFields: Array[String] =
params.remove("required_fields").split(",")
+ val requiredFields: Array[String] = {
+ val readFields =
params.remove("required_fields").split(",").filter(_.nonEmpty)
+ if (readFields.isEmpty) {
+ // If only read the partition columns, the JniConnector will produce
empty required fields.
+ // Read the "_hoodie_record_key" field at least to know how many rows in
current hoodie split
+ // Even if the JniConnector doesn't read this field, the call of
releaseTable will reclaim the resource
+ Array(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+ } else {
+ readFields
+ }
+ }
val requiredTypes: Array[ColumnType] = requiredFields.map(
field => ColumnType.parseType(field, hudiColumnTypes(field)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]