This is an automated email from the ASF dual-hosted git repository.
rdsr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4d1fc91 Remove thread local objects and use new visitors to fix
executor memory leaks (#1169)
4d1fc91 is described below
commit 4d1fc91d6528db88548c506a0b110b50121d67ff
Author: jun-he <[email protected]>
AuthorDate: Mon Jul 6 10:49:24 2020 -0700
Remove thread local objects and use new visitors to fix executor memory
leaks (#1169)
---
.../org/apache/iceberg/expressions/Evaluator.java | 10 +---------
.../expressions/InclusiveMetricsEvaluator.java | 10 +---------
.../iceberg/expressions/ManifestEvaluator.java | 10 +---------
.../iceberg/expressions/ResidualEvaluator.java | 10 +---------
.../expressions/StrictMetricsEvaluator.java | 10 +---------
.../parquet/ParquetDictionaryRowGroupFilter.java | 23 ++--------------------
.../parquet/ParquetMetricsRowGroupFilter.java | 10 +---------
7 files changed, 8 insertions(+), 75 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
b/api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
index 0d3e1ef..b80a13b 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
@@ -35,14 +35,6 @@ import org.apache.iceberg.types.Types.StructType;
*/
public class Evaluator implements Serializable {
private final Expression expr;
- private transient ThreadLocal<EvalVisitor> visitors = null;
-
- private EvalVisitor visitor() {
- if (visitors == null) {
- this.visitors = ThreadLocal.withInitial(EvalVisitor::new);
- }
- return visitors.get();
- }
public Evaluator(StructType struct, Expression unbound) {
this.expr = Binder.bind(struct, unbound, true);
@@ -53,7 +45,7 @@ public class Evaluator implements Serializable {
}
public boolean eval(StructLike data) {
- return visitor().eval(data);
+ return new EvalVisitor().eval(data);
}
private class EvalVisitor extends BoundVisitor<Boolean> {
diff --git
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
index 148ac4f..7bad98c 100644
---
a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
+++
b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
@@ -47,14 +47,6 @@ import static
org.apache.iceberg.expressions.Expressions.rewriteNot;
*/
public class InclusiveMetricsEvaluator {
private final Expression expr;
- private transient ThreadLocal<MetricsEvalVisitor> visitors = null;
-
- private MetricsEvalVisitor visitor() {
- if (visitors == null) {
- this.visitors = ThreadLocal.withInitial(MetricsEvalVisitor::new);
- }
- return visitors.get();
- }
public InclusiveMetricsEvaluator(Schema schema, Expression unbound) {
this(schema, unbound, true);
@@ -73,7 +65,7 @@ public class InclusiveMetricsEvaluator {
*/
public boolean eval(ContentFile<?> file) {
// TODO: detect the case where a column is missing from the file using
file's max field id.
- return visitor().eval(file);
+ return new MetricsEvalVisitor().eval(file);
}
private static final boolean ROWS_MIGHT_MATCH = true;
diff --git
a/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
index 4023644..5e8be15 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
@@ -50,14 +50,6 @@ import static
org.apache.iceberg.expressions.Expressions.rewriteNot;
public class ManifestEvaluator {
private final StructType struct;
private final Expression expr;
- private transient ThreadLocal<ManifestEvalVisitor> visitors = null;
-
- private ManifestEvalVisitor visitor() {
- if (visitors == null) {
- this.visitors = ThreadLocal.withInitial(ManifestEvalVisitor::new);
- }
- return visitors.get();
- }
public static ManifestEvaluator forRowFilter(Expression rowFilter,
PartitionSpec spec, boolean caseSensitive) {
return new ManifestEvaluator(spec, Projections.inclusive(spec,
caseSensitive).project(rowFilter), caseSensitive);
@@ -80,7 +72,7 @@ public class ManifestEvaluator {
* @return false if the file cannot contain rows that match the expression,
true otherwise.
*/
public boolean eval(ManifestFile manifest) {
- return visitor().eval(manifest);
+ return new ManifestEvalVisitor().eval(manifest);
}
private static final boolean ROWS_MIGHT_MATCH = true;
diff --git
a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
index 7dd51b5..3ea3d7b 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ResidualEvaluator.java
@@ -92,14 +92,6 @@ public class ResidualEvaluator implements Serializable {
private final PartitionSpec spec;
private final Expression expr;
private final boolean caseSensitive;
- private transient ThreadLocal<ResidualVisitor> visitors = null;
-
- private ResidualVisitor visitor() {
- if (visitors == null) {
- this.visitors = ThreadLocal.withInitial(ResidualVisitor::new);
- }
- return visitors.get();
- }
private ResidualEvaluator(PartitionSpec spec, Expression expr, boolean
caseSensitive) {
this.spec = spec;
@@ -114,7 +106,7 @@ public class ResidualEvaluator implements Serializable {
* @return the residual of this evaluator's expression from the partition
values
*/
public Expression residualFor(StructLike partitionData) {
- return visitor().eval(partitionData);
+ return new ResidualVisitor().eval(partitionData);
}
private class ResidualVisitor extends BoundExpressionVisitor<Expression> {
diff --git
a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
index 638eb25..8fd0b60 100644
---
a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
+++
b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
@@ -49,14 +49,6 @@ public class StrictMetricsEvaluator {
private final Schema schema;
private final StructType struct;
private final Expression expr;
- private transient ThreadLocal<MetricsEvalVisitor> visitors = null;
-
- private MetricsEvalVisitor visitor() {
- if (visitors == null) {
- this.visitors = ThreadLocal.withInitial(MetricsEvalVisitor::new);
- }
- return visitors.get();
- }
public StrictMetricsEvaluator(Schema schema, Expression unbound) {
this.schema = schema;
@@ -72,7 +64,7 @@ public class StrictMetricsEvaluator {
*/
public boolean eval(ContentFile<?> file) {
// TODO: detect the case where a column is missing from the file using
file's max field id.
- return visitor().eval(file);
+ return new MetricsEvalVisitor().eval(file);
}
private static final boolean ROWS_MUST_MATCH = true;
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index 0a3ade8..c139521 100644
---
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -48,14 +48,6 @@ import org.apache.parquet.schema.PrimitiveType;
public class ParquetDictionaryRowGroupFilter {
private final Expression expr;
- private transient ThreadLocal<EvalVisitor> visitors = null;
-
- private EvalVisitor visitor() {
- if (visitors == null) {
- this.visitors = ThreadLocal.withInitial(EvalVisitor::new);
- }
- return visitors.get();
- }
public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
this(schema, unbound, true);
@@ -75,7 +67,7 @@ public class ParquetDictionaryRowGroupFilter {
*/
public boolean shouldRead(MessageType fileSchema, BlockMetaData rowGroup,
DictionaryPageReadStore dictionaries) {
- return visitor().eval(fileSchema, rowGroup, dictionaries);
+ return new EvalVisitor().eval(fileSchema, rowGroup, dictionaries);
}
private static final boolean ROWS_MIGHT_MATCH = true;
@@ -116,18 +108,7 @@ public class ParquetDictionaryRowGroupFilter {
}
}
- try {
- return ExpressionVisitors.visitEvaluator(expr, this);
-
- } finally {
- // allow temporary state to be collected because this is in a
thread-local
- this.dictionaries = null;
- this.dictCache = null;
- this.isFallback = null;
- this.mayContainNulls = null;
- this.cols = null;
- this.conversions = null;
- }
+ return ExpressionVisitors.visitEvaluator(expr, this);
}
@Override
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index 3fdf905..56292f2 100644
---
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -49,14 +49,6 @@ import org.apache.parquet.schema.PrimitiveType;
public class ParquetMetricsRowGroupFilter {
private final Schema schema;
private final Expression expr;
- private transient ThreadLocal<MetricsEvalVisitor> visitors = null;
-
- private MetricsEvalVisitor visitor() {
- if (visitors == null) {
- this.visitors = ThreadLocal.withInitial(MetricsEvalVisitor::new);
- }
- return visitors.get();
- }
public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
this(schema, unbound, true);
@@ -76,7 +68,7 @@ public class ParquetMetricsRowGroupFilter {
* @return false if the file cannot contain rows that match the expression,
true otherwise.
*/
public boolean shouldRead(MessageType fileSchema, BlockMetaData rowGroup) {
- return visitor().eval(fileSchema, rowGroup);
+ return new MetricsEvalVisitor().eval(fileSchema, rowGroup);
}
private static final boolean ROWS_MIGHT_MATCH = true;