This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2446cee5cf Core: Close the MetricsReporter when Catalog is closed
(#9353)
2446cee5cf is described below
commit 2446cee5cf0ad93a2be9a68f0b2f7f6fa6edb865
Author: big face cat <[email protected]>
AuthorDate: Thu Jan 18 15:31:40 2024 +0800
Core: Close the MetricsReporter when Catalog is closed (#9353)
---
.../org/apache/iceberg/metrics/MetricsReporter.java | 6 +++++-
.../apache/iceberg/aws/dynamodb/DynamoDbCatalog.java | 4 ++--
.../java/org/apache/iceberg/aws/glue/GlueCatalog.java | 4 ++--
.../java/org/apache/iceberg/BaseMetastoreCatalog.java | 13 +++++++++++--
.../java/org/apache/iceberg/hadoop/HadoopCatalog.java | 4 ++--
.../org/apache/iceberg/inmemory/InMemoryCatalog.java | 6 ++++++
.../java/org/apache/iceberg/jdbc/JdbcCatalog.java | 19 ++++++++++++++++---
.../java/org/apache/iceberg/dell/ecs/EcsCatalog.java | 4 ++--
.../java/org/apache/iceberg/nessie/NessieCatalog.java | 3 ++-
.../apache/iceberg/snowflake/SnowflakeCatalog.java | 4 ++--
10 files changed, 50 insertions(+), 17 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
index 365f7f99d6..9958b75ca3 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java
@@ -18,11 +18,12 @@
*/
package org.apache.iceberg.metrics;
+import java.io.Closeable;
import java.util.Map;
/** This interface defines the basic API for reporting metrics for operations
to a Table. */
@FunctionalInterface
-public interface MetricsReporter {
+public interface MetricsReporter extends Closeable {
/**
* A custom MetricsReporter implementation must have a no-arg constructor,
which will be called
@@ -40,4 +41,7 @@ public interface MetricsReporter {
* @param report The {@link MetricsReport} to report.
*/
void report(MetricsReport report);
+
+ @Override
+ default void close() {}
}
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
index fc1479c3a0..0c991af750 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.aws.dynamodb;
-import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -87,7 +86,7 @@ import
software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
/** DynamoDB implementation of Iceberg catalog */
public class DynamoDbCatalog extends BaseMetastoreCatalog
- implements Closeable, SupportsNamespaces, Configurable {
+ implements SupportsNamespaces, Configurable {
private static final Logger LOG =
LoggerFactory.getLogger(DynamoDbCatalog.class);
private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
@@ -143,6 +142,7 @@ public class DynamoDbCatalog extends BaseMetastoreCatalog
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(dynamo);
closeableGroup.addCloseable(fileIO);
+ closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
ensureCatalogTableExistsOrCreate();
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
index 6e95379c1d..bdc2452731 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.aws.glue;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
-import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -85,7 +84,7 @@ import software.amazon.awssdk.services.glue.model.TableInput;
import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
public class GlueCatalog extends BaseMetastoreCatalog
- implements Closeable, SupportsNamespaces, Configurable<Configuration> {
+ implements SupportsNamespaces, Configurable<Configuration> {
private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);
@@ -197,6 +196,7 @@ public class GlueCatalog extends BaseMetastoreCatalog
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(glue);
closeableGroup.addCloseable(lockManager);
+ closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
this.fileIOCloser = newFileIOCloser();
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index a683533473..bb7d5a0ffd 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -34,7 +36,7 @@ import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class BaseMetastoreCatalog implements Catalog {
+public abstract class BaseMetastoreCatalog implements Catalog, Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(BaseMetastoreCatalog.class);
private MetricsReporter metricsReporter;
@@ -305,11 +307,18 @@ public abstract class BaseMetastoreCatalog implements
Catalog {
return sb.toString();
}
- private MetricsReporter metricsReporter() {
+ protected MetricsReporter metricsReporter() {
if (metricsReporter == null) {
metricsReporter = CatalogUtil.loadMetricsReporter(properties());
}
return metricsReporter;
}
+
+ @Override
+ public void close() throws IOException {
+ if (metricsReporter != null) {
+ metricsReporter.close();
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
index e9ed4dcd28..92ba25af0f 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.hadoop;
-import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -78,7 +77,7 @@ import org.slf4j.LoggerFactory;
* <p>Note: The HadoopCatalog requires that the underlying file system
supports atomic rename.
*/
public class HadoopCatalog extends BaseMetastoreCatalog
- implements Closeable, SupportsNamespaces, Configurable {
+ implements SupportsNamespaces, Configurable {
private static final Logger LOG =
LoggerFactory.getLogger(HadoopCatalog.class);
@@ -122,6 +121,7 @@ public class HadoopCatalog extends BaseMetastoreCatalog
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(lockManager);
+ closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
this.suppressPermissionError =
diff --git
a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
index 51d242f934..a880f94f43 100644
--- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
@@ -42,6 +42,7 @@ import
org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
@@ -68,6 +69,7 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog
private FileIO io;
private String catalogName;
private String warehouseLocation;
+ private CloseableGroup closeableGroup;
public InMemoryCatalog() {
this.namespaces = Maps.newConcurrentMap();
@@ -87,6 +89,9 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog
String warehouse =
properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
this.warehouseLocation = warehouse.replaceAll("/*$", "");
this.io = new InMemoryFileIO();
+ this.closeableGroup = new CloseableGroup();
+ closeableGroup.addCloseable(metricsReporter());
+ closeableGroup.setSuppressCloseFailure(true);
}
@Override
@@ -302,6 +307,7 @@ public class InMemoryCatalog extends
BaseMetastoreViewCatalog
@Override
public void close() throws IOException {
+ closeableGroup.close();
namespaces.clear();
tables.clear();
views.clear();
diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
index 314595dd02..0bab6ade4c 100644
--- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
@@ -18,7 +18,8 @@
*/
package org.apache.iceberg.jdbc;
-import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -50,6 +51,7 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -62,7 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JdbcCatalog extends BaseMetastoreCatalog
- implements Configurable<Object>, SupportsNamespaces, Closeable {
+ implements Configurable<Object>, SupportsNamespaces {
public static final String PROPERTY_PREFIX = "jdbc.";
private static final String NAMESPACE_EXISTS_PROPERTY = "exists";
@@ -78,6 +80,7 @@ public class JdbcCatalog extends BaseMetastoreCatalog
private final Function<Map<String, String>, FileIO> ioBuilder;
private final Function<Map<String, String>, JdbcClientPool>
clientPoolBuilder;
private final boolean initializeCatalogTables;
+ private CloseableGroup closeableGroup;
public JdbcCatalog() {
this(null, null, true);
@@ -140,6 +143,10 @@ public class JdbcCatalog extends BaseMetastoreCatalog
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted in call to
initialize");
}
+ this.closeableGroup = new CloseableGroup();
+ closeableGroup.addCloseable(metricsReporter());
+ closeableGroup.addCloseable(connections);
+ closeableGroup.setSuppressCloseFailure(true);
}
private void initializeCatalogTables() throws InterruptedException,
SQLException {
@@ -482,7 +489,13 @@ public class JdbcCatalog extends BaseMetastoreCatalog
@Override
public void close() {
- connections.close();
+ if (closeableGroup != null) {
+ try {
+ closeableGroup.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
@Override
diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
index f951c8c937..07ad683658 100644
--- a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
+++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
@@ -26,7 +26,6 @@ import com.emc.object.s3.bean.ListObjectsResult;
import com.emc.object.s3.bean.S3Object;
import com.emc.object.s3.request.ListObjectsRequest;
import com.emc.object.s3.request.PutObjectRequest;
-import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
@@ -64,7 +63,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EcsCatalog extends BaseMetastoreCatalog
- implements Closeable, SupportsNamespaces, Configurable<Object> {
+ implements SupportsNamespaces, Configurable<Object> {
/** Suffix of table metadata object */
private static final String TABLE_OBJECT_SUFFIX = ".table";
@@ -111,6 +110,7 @@ public class EcsCatalog extends BaseMetastoreCatalog
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(client::destroy);
closeableGroup.addCloseable(fileIO);
+ closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
}
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
index 6a877893ef..cce6fcf144 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
/** Nessie implementation of Iceberg Catalog. */
public class NessieCatalog extends BaseMetastoreViewCatalog
- implements AutoCloseable, SupportsNamespaces, Configurable<Object> {
+ implements SupportsNamespaces, Configurable<Object> {
private static final Logger LOG =
LoggerFactory.getLogger(NessieCatalog.class);
private static final Joiner SLASH = Joiner.on("/");
@@ -176,6 +176,7 @@ public class NessieCatalog extends BaseMetastoreViewCatalog
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(client);
closeableGroup.addCloseable(fileIO);
+ closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
}
diff --git
a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
index dd20c8ded9..06dacad185 100644
--- a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
+++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.snowflake;
-import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -45,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SnowflakeCatalog extends BaseMetastoreCatalog
- implements Closeable, SupportsNamespaces, Configurable<Object> {
+ implements SupportsNamespaces, Configurable<Object> {
private static final String DEFAULT_CATALOG_NAME = "snowflake_catalog";
private static final String DEFAULT_FILE_IO_IMPL =
"org.apache.iceberg.io.ResolvingFileIO";
// Specifies the name of a Snowflake's partner application to connect
through JDBC.
@@ -157,6 +156,7 @@ public class SnowflakeCatalog extends BaseMetastoreCatalog
this.catalogProperties = properties;
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(snowflakeClient);
+ closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
}