This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 13c9c41c1f [opt](hudi) reduce the memory usage of avro reader (#23745)
13c9c41c1f is described below
commit 13c9c41c1f74711292bbcd44877efd166b4379b0
Author: Ashin Gau <[email protected]>
AuthorDate: Tue Sep 5 23:59:23 2023 +0800
[opt](hudi) reduce the memory usage of avro reader (#23745)
1. Reduce the number of threads reading avro logs and keep the readers in a
fixed thread pool.
2. Regularly cleaning the cached resolvers in the thread local map by
reflection.
---
.../java/org/apache/doris/hudi/HudiJniScanner.java | 141 ++++++++++++++++-----
.../org/apache/doris/hudi/BaseSplitReader.scala | 34 +++--
2 files changed, 125 insertions(+), 50 deletions(-)
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index 539ab8f7a8..417b338115 100644
---
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -22,6 +22,9 @@ import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.util.WeakIdentityHashMap;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -30,14 +33,21 @@ import scala.collection.Iterator;
import java.io.Closeable;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@@ -56,6 +66,56 @@ public class HudiJniScanner extends JniScanner {
private long getRecordReaderTimeNs = 0;
private Iterator<InternalRow> recordIterator;
+ /**
+ * `GenericDatumReader` of avro is a thread local map, that stores
`WeakIdentityHashMap`.
+ * `WeakIdentityHashMap` has cached the avro resolving decoder, and the
cached resolver can only be cleaned when
+ * its avro schema is recycled and become a week reference. However, the
behavior of the week reference queue
+ * of `WeakIdentityHashMap` is unpredictable. Secondly, the decoder is
very memory intensive, the number of threads
+ * to call the thread local map cannot be too many.
+ * Two solutions:
+ * 1. Reduce the number of threads reading avro logs and keep the readers
in a fixed thread pool.
+ * 2. Regularly cleaning the cached resolvers in the thread local map by
reflection.
+ */
+ private static final AtomicLong lastUpdateTime = new
AtomicLong(System.currentTimeMillis());
+ private static final long RESOLVER_TIME_OUT = 60000;
+ private static final ExecutorService avroReadPool;
+ private static ThreadLocal<WeakIdentityHashMap<?, ?>> AVRO_RESOLVER_CACHE;
+ private static final Map<Long, WeakIdentityHashMap<?, ?>> cachedResolvers
= new ConcurrentHashMap<>();
+ private static final ReadWriteLock cleanResolverLock = new
ReentrantReadWriteLock();
+ private static final ScheduledExecutorService cleanResolverService =
Executors.newScheduledThreadPool(1);
+
+ static {
+ int numThreads = Math.max(Runtime.getRuntime().availableProcessors() *
2 + 1, 4);
+ avroReadPool = Executors.newFixedThreadPool(numThreads,
+ new
ThreadFactoryBuilder().setNameFormat("avro-log-reader-%d").build());
+ LOG.info("Create " + numThreads + " daemon threads to load avro logs");
+
+ Class<?> avroReader = GenericDatumReader.class;
+ try {
+ Field field = avroReader.getDeclaredField("RESOLVER_CACHE");
+ field.setAccessible(true);
+ AVRO_RESOLVER_CACHE = (ThreadLocal<WeakIdentityHashMap<?, ?>>)
field.get(null);
+ LOG.info("Get the resolved cache for avro reader");
+ } catch (Exception e) {
+ AVRO_RESOLVER_CACHE = null;
+ LOG.warn("Failed to get the resolved cache for avro reader");
+ }
+
+ cleanResolverService.scheduleAtFixedRate(() -> {
+ cleanResolverLock.writeLock().lock();
+ try {
+ if (System.currentTimeMillis() - lastUpdateTime.get() >
RESOLVER_TIME_OUT) {
+ for (WeakIdentityHashMap<?, ?> solver :
cachedResolvers.values()) {
+ solver.clear();
+ }
+ lastUpdateTime.set(System.currentTimeMillis());
+ }
+ } finally {
+ cleanResolverLock.writeLock().unlock();
+ }
+ }, RESOLVER_TIME_OUT, RESOLVER_TIME_OUT, TimeUnit.MILLISECONDS);
+ }
+
public HudiJniScanner(int fetchSize, Map<String, String> params) {
debugString = params.entrySet().stream().map(kv -> kv.getKey() + "=" +
kv.getValue())
.collect(Collectors.joining("\n"));
@@ -84,46 +144,62 @@ public class HudiJniScanner extends JniScanner {
@Override
public void open() throws IOException {
- Thread.currentThread().setContextClassLoader(classLoader);
- initTableInfo(split.requiredTypes(), split.requiredFields(),
predicates, fetchSize);
- long startTime = System.nanoTime();
- // RecordReader will use ProcessBuilder to start a hotspot process,
which may be stuck,
- // so use another process to kill this stuck process.
- // TODO(gaoxin): better way to solve the stuck process?
- AtomicBoolean isKilled = new AtomicBoolean(false);
- ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1);
- executorService.scheduleAtFixedRate(() -> {
- if (!isKilled.get()) {
- synchronized (HudiJniScanner.class) {
- List<Long> pids = Utils.getChildProcessIds(
- Utils.getCurrentProcId());
- for (long pid : pids) {
- String cmd = Utils.getCommandLine(pid);
- if (cmd != null &&
cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
- Utils.killProcess(pid);
- isKilled.set(true);
- LOG.info("Kill hotspot debugger process " + pid);
+ Future<?> avroFuture = avroReadPool.submit(() -> {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ initTableInfo(split.requiredTypes(), split.requiredFields(),
predicates, fetchSize);
+ long startTime = System.nanoTime();
+ // RecordReader will use ProcessBuilder to start a hotspot
process, which may be stuck,
+ // so use another process to kill this stuck process.
+ // TODO(gaoxin): better way to solve the stuck process?
+ AtomicBoolean isKilled = new AtomicBoolean(false);
+ ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1);
+ executorService.scheduleAtFixedRate(() -> {
+ if (!isKilled.get()) {
+ synchronized (HudiJniScanner.class) {
+ List<Long> pids = Utils.getChildProcessIds(
+ Utils.getCurrentProcId());
+ for (long pid : pids) {
+ String cmd = Utils.getCommandLine(pid);
+ if (cmd != null &&
cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
+ Utils.killProcess(pid);
+ isKilled.set(true);
+ LOG.info("Kill hotspot debugger process " +
pid);
+ }
}
}
}
+ }, 100, 1000, TimeUnit.MILLISECONDS);
+
+ cleanResolverLock.readLock().lock();
+ try {
+ lastUpdateTime.set(System.currentTimeMillis());
+ if (ugi != null) {
+ recordIterator = ugi.doAs(
+ (PrivilegedExceptionAction<Iterator<InternalRow>>)
() -> new MORSnapshotSplitReader(
+
split).buildScanIterator(split.requiredFields(), new Filter[0]));
+ } else {
+ recordIterator = new MORSnapshotSplitReader(split)
+ .buildScanIterator(split.requiredFields(), new
Filter[0]);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to open hudi scanner, split params:\n" +
debugString, e);
+ throw new RuntimeException(e.getMessage(), e);
+ } finally {
+ cleanResolverLock.readLock().unlock();
}
- }, 100, 1000, TimeUnit.MILLISECONDS);
- try {
- if (ugi != null) {
- recordIterator = ugi.doAs(
- (PrivilegedExceptionAction<Iterator<InternalRow>>) ()
-> new MORSnapshotSplitReader(
-
split).buildScanIterator(split.requiredFields(), new Filter[0]));
- } else {
- recordIterator = new MORSnapshotSplitReader(split)
- .buildScanIterator(split.requiredFields(), new
Filter[0]);
+ isKilled.set(true);
+ executorService.shutdownNow();
+ if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() !=
null) {
+ cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
+ threadId -> AVRO_RESOLVER_CACHE.get());
}
+ getRecordReaderTimeNs += System.nanoTime() - startTime;
+ });
+ try {
+ avroFuture.get();
} catch (Exception e) {
- LOG.error("Failed to open hudi scanner, split params:\n" +
debugString, e);
throw new IOException(e.getMessage(), e);
}
- isKilled.set(true);
- executorService.shutdownNow();
- getRecordReaderTimeNs += System.nanoTime() - startTime;
}
@Override
@@ -131,6 +207,7 @@ public class HudiJniScanner extends JniScanner {
if (recordIterator instanceof Closeable) {
((Closeable) recordIterator).close();
}
+ recordIterator = null;
}
@Override
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
index cdae395534..5ba16a5e16 100644
---
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -153,7 +153,6 @@ case class HoodieTableInformation(sparkSession:
SparkSession,
metaClient: HoodieTableMetaClient,
timeline: HoodieTimeline,
tableConfig: HoodieTableConfig,
- tableAvroSchema: Schema,
internalSchemaOpt: Option[InternalSchema])
/**
@@ -215,7 +214,22 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
* required to fetch table's Avro and Internal schemas
*/
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt:
Option[InternalSchema]) = {
- (tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt)
+ val schemaResolver = new TableSchemaResolver(tableInformation.metaClient)
+ val (name, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
+ val avroSchema: Schema = tableInformation.internalSchemaOpt.map { is =>
+ AvroInternalSchemaConverter.convert(is, namespace + "." + name)
+ } orElse {
+ specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
+ } orElse {
+ split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
+ } getOrElse {
+ Try(schemaResolver.getTableAvroSchema) match {
+ case Success(schema) => schema
+ case Failure(e) =>
+ throw new HoodieSchemaException("Failed to fetch schema from the
table", e)
+ }
+ }
+ (avroSchema, tableInformation.internalSchemaOpt)
}
protected lazy val tableStructSchema: StructType =
convertAvroSchemaToStructType(tableAvroSchema)
@@ -649,27 +663,11 @@ object BaseSplitReader {
None
}
}
- val tableName = metaClient.getTableConfig.getTableName
- val (name, namespace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
- val avroSchema: Schema = internalSchemaOpt.map { is =>
- AvroInternalSchemaConverter.convert(is, namespace + "." + name)
- } orElse {
- specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
- } orElse {
- split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
- } getOrElse {
- Try(schemaResolver.getTableAvroSchema) match {
- case Success(schema) => schema
- case Failure(e) =>
- throw new HoodieSchemaException("Failed to fetch schema from the
table", e)
- }
- }
HoodieTableInformation(sparkSession,
metaClient,
timeline,
metaClient.getTableConfig,
- avroSchema,
internalSchemaOpt)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]