This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5c703695d2 Add close method to upsert interfaces (#9212)
5c703695d2 is described below
commit 5c703695d245278e4a47ab066c6d6477dfcf487c
Author: Kartik Khare <[email protected]>
AuthorDate: Wed Aug 17 15:43:38 2022 +0530
Add close method to upsert interfaces (#9212)
* Add close method to upsert interfaces
* Extend with Closeable
Co-authored-by: Kartik Khare <[email protected]>
---
.../core/data/manager/realtime/RealtimeTableDataManager.java | 7 +++++++
.../local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java | 6 ++++++
.../local/upsert/ConcurrentMapTableUpsertMetadataManager.java | 8 ++++++++
.../segment/local/upsert/PartitionUpsertMetadataManager.java | 3 ++-
.../pinot/segment/local/upsert/TableUpsertMetadataManager.java | 3 ++-
5 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index af7f366ebd..f41f7edba5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -199,6 +199,13 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@Override
protected void doShutdown() {
_segmentAsyncExecutorService.shutdown();
+ if (_tableUpsertMetadataManager != null) {
+ try {
+ _tableUpsertMetadataManager.close();
+ } catch (IOException e) {
+ _logger.warn("Cannot close upsert metadata manager properly for table:
{}", _tableNameWithType, e);
+ }
+ }
for (SegmentDataManager segmentDataManager :
_segmentDataManagerMap.values()) {
segmentDataManager.destroy();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index e6890c8a6c..1654eaca0d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -428,6 +428,12 @@ public class ConcurrentMapPartitionUpsertMetadataManager
implements PartitionUps
}
}
+ @Override
+ public void close() {
+ _logger.info("Closing metadata manager for table {} and partition {},
current primary key count: {}",
+ _tableNameWithType, _partitionId,
_primaryKeyToRecordLocationMap.size());
+ }
+
@VisibleForTesting
static class RecordLocation {
private final IndexSegment _segment;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 67474e145d..5c5c357079 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -37,4 +37,12 @@ public class ConcurrentMapTableUpsertMetadataManager extends
BaseTableUpsertMeta
k -> new
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k,
_primaryKeyColumns,
_comparisonColumn, _hashFunction, _partialUpsertHandler,
_serverMetrics));
}
+
+ @Override
+ public void close() {
+ for (ConcurrentMapPartitionUpsertMetadataManager
partitionUpsertMetadataManager
+ : _partitionMetadataManagerMap.values()) {
+ partitionUpsertMetadataManager.close();
+ }
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 2c5f68df45..ef5ec7c414 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.upsert;
+import java.io.Closeable;
import java.util.List;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.segment.spi.ImmutableSegment;
@@ -51,7 +52,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
* </ul>
*/
@ThreadSafe
-public interface PartitionUpsertMetadataManager {
+public interface PartitionUpsertMetadataManager extends Closeable {
/**
* Returns the primary key columns.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index ffafb999ee..5d0c450166 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.upsert;
+import java.io.Closeable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -30,7 +31,7 @@ import org.apache.pinot.spi.data.Schema;
* The manager of the upsert metadata of a table.
*/
@ThreadSafe
-public interface TableUpsertMetadataManager {
+public interface TableUpsertMetadataManager extends Closeable {
void init(TableConfig tableConfig, Schema schema, TableDataManager
tableDataManager, ServerMetrics serverMetrics);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]