dant3 commented on code in PR #7701: URL: https://github.com/apache/ignite-3/pull/7701#discussion_r2918542764
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.failure.FailureProcessor; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lowwatermark.LowWatermark; +import org.apache.ignite.internal.network.TopologyService; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.placementdriver.LeasePlacementDriver; +import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner; +import org.apache.ignite.internal.raft.service.RaftCommandRunner; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; +import org.apache.ignite.internal.schema.SchemaManager; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory.SizeSupplier; +import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; +import org.apache.ignite.internal.table.distributed.gc.MvGc; +import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; +import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; +import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService; +import org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor; +import org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser; +import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl; +import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotAwarePartitionDataStorage; +import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; +import org.apache.ignite.internal.tx.LockManager; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry; +import org.apache.ignite.internal.tx.impl.TransactionStateResolver; +import org.apache.ignite.internal.util.Lazy; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; + +/** + * Stateless factory for creating partition-level resources: data storage wrappers, update handlers, and replica listeners. + * + * <p>This factory performs pure construction only — it does not start components, register metrics, + * or own any mutable state. Lifecycle management (start/stop, metric registration/deregistration) + * remains in {@link TableManager}. + * + * <p><b>Lifecycle ordering:</b> the caller must invoke {@link StorageUpdateHandler#start} on the + * {@link PartitionResources#storageUpdateHandler} returned by {@link #createPartitionResources} before + * the constructed objects ({@link TablePartitionProcessor}, {@link PartitionMvStorageAccess}, + * {@link PartitionReplicaListener}) are used at runtime. + */ +class TablePartitionResourcesFactory { + private final TxManager txManager; + private final LockManager lockManager; + private final ExecutorService scanRequestExecutor; + private final ClockService clockService; + private final CatalogService catalogService; + private final PartitionModificationCounterFactory partitionModificationCounterFactory; + private final OutgoingSnapshotsManager outgoingSnapshotsManager; + private final LowWatermark lowWatermark; + private final ValidationSchemasSource validationSchemasSource; + private final SchemaSyncService schemaSyncService; + private final LeasePlacementDriver placementDriver; + private final TopologyService topologyService; + private final RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry; + private final FailureProcessor failureProcessor; + private final SchemaManager schemaManager; + private final ReplicationConfiguration replicationConfiguration; + private final Executor partitionOperationsExecutor; + private final IndexMetaStorage indexMetaStorage; + private final MinimumRequiredTimeCollectorService minTimeCollectorService; + private final MvGc mvGc; + private final FullStateTransferIndexChooser fullStateTransferIndexChooser; + + TablePartitionResourcesFactory( + TxManager txManager, + LockManager lockManager, + ExecutorService scanRequestExecutor, + ClockService clockService, + CatalogService catalogService, + PartitionModificationCounterFactory partitionModificationCounterFactory, + OutgoingSnapshotsManager outgoingSnapshotsManager, + LowWatermark lowWatermark, + ValidationSchemasSource validationSchemasSource, + SchemaSyncService schemaSyncService, + LeasePlacementDriver placementDriver, + TopologyService topologyService, + RemotelyTriggeredResourceRegistry remotelyTriggeredResourceRegistry, + FailureProcessor failureProcessor, + SchemaManager schemaManager, + ReplicationConfiguration replicationConfiguration, + Executor partitionOperationsExecutor, + IndexMetaStorage indexMetaStorage, + MinimumRequiredTimeCollectorService minTimeCollectorService, + MvGc mvGc, + FullStateTransferIndexChooser fullStateTransferIndexChooser + ) { + this.txManager = requireNonNull(txManager, "txManager"); + this.lockManager = requireNonNull(lockManager, "lockManager"); + this.scanRequestExecutor = requireNonNull(scanRequestExecutor, "scanRequestExecutor"); + this.clockService = requireNonNull(clockService, "clockService"); + this.catalogService = requireNonNull(catalogService, "catalogService"); + this.partitionModificationCounterFactory = + requireNonNull(partitionModificationCounterFactory, "partitionModificationCounterFactory"); + this.outgoingSnapshotsManager = requireNonNull(outgoingSnapshotsManager, "outgoingSnapshotsManager"); + this.lowWatermark = requireNonNull(lowWatermark, "lowWatermark"); + this.validationSchemasSource = requireNonNull(validationSchemasSource, "validationSchemasSource"); + this.schemaSyncService = requireNonNull(schemaSyncService, "schemaSyncService"); + this.placementDriver = requireNonNull(placementDriver, "placementDriver"); + this.topologyService = requireNonNull(topologyService, "topologyService"); + this.remotelyTriggeredResourceRegistry = requireNonNull(remotelyTriggeredResourceRegistry, "remotelyTriggeredResourceRegistry"); + this.failureProcessor = requireNonNull(failureProcessor, "failureProcessor"); + this.schemaManager = requireNonNull(schemaManager, "schemaManager"); + this.replicationConfiguration = requireNonNull(replicationConfiguration, "replicationConfiguration"); + this.partitionOperationsExecutor = requireNonNull(partitionOperationsExecutor, "partitionOperationsExecutor"); + this.indexMetaStorage = requireNonNull(indexMetaStorage, "indexMetaStorage"); + this.minTimeCollectorService = requireNonNull(minTimeCollectorService, "minTimeCollectorService"); + this.mvGc = requireNonNull(mvGc, "mvGc"); + this.fullStateTransferIndexChooser = requireNonNull(fullStateTransferIndexChooser, "fullStateTransferIndexChooser"); + } + + /** + * Creates a {@link PartitionDataStorage} for the given partition. + * + * @param partitionKey Partition key. + * @param tableId Table ID. + * @param partitionStorage MV partition storage. + * @return Partition data storage. + */ + PartitionDataStorage createPartitionDataStorage(PartitionKey partitionKey, int tableId, MvPartitionStorage partitionStorage) { + return new SnapshotAwarePartitionDataStorage( + tableId, + partitionStorage, + outgoingSnapshotsManager, + partitionKey + ); + } + + /** + * Creates partition resources (index update handler, GC update handler, storage update handler, modification counter). + * + * <p>The returned resources are not started — the caller must invoke + * {@link StorageUpdateHandler#start} on {@link PartitionResources#storageUpdateHandler} + * before the constructed partition objects are used at runtime. + * + * @param partitionId Partition ID. + * @param partitionDataStorage Partition data storage. + * @param table Table view. + * @param safeTimeTracker Safe time tracker. + * @return Partition resources. + */ + PartitionResources createPartitionResources( + int partitionId, + PartitionDataStorage partitionDataStorage, + TableViewInternal table, + PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeTracker + ) { + TableIndexStoragesSupplier indexes = table.indexStorageAdapters(partitionId); + + IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(indexes); + + GcUpdateHandler gcUpdateHandler = new GcUpdateHandler(partitionDataStorage, safeTimeTracker, indexUpdateHandler); + + SizeSupplier partSizeSupplier = () -> partitionDataStorage.getStorage().estimatedSize(); + + PartitionModificationCounter modificationCounter = + partitionModificationCounterFactory.create(partSizeSupplier, table::stalenessConfiguration, table.tableId(), partitionId); + + StorageUpdateHandler storageUpdateHandler = new StorageUpdateHandler( + partitionId, + partitionDataStorage, + indexUpdateHandler, + replicationConfiguration, + modificationCounter, + txManager + ); + + return new PartitionResources(storageUpdateHandler, indexUpdateHandler, gcUpdateHandler, modificationCounter); + } + + /** + * Creates a {@link TablePartitionProcessor} for the given partition. + * + * @param zonePartitionId Zone partition ID. + * @param table Table view. + * @param partitionDataStorage Partition data storage. + * @param partitionResources Partition resources. + * @return Table partition processor. + */ + TablePartitionProcessor createTablePartitionProcessor( + ZonePartitionId zonePartitionId, + TableViewInternal table, + PartitionDataStorage partitionDataStorage, + PartitionResources partitionResources + ) { + return new TablePartitionProcessor( + txManager, + partitionDataStorage, + partitionResources.storageUpdateHandler, + catalogService, + table.schemaView(), + indexMetaStorage, + topologyService.localMember().id(), Review Comment: I reverted caching it because scalecube init happens later in the lifecycle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
