ibessonov commented on code in PR #7019: URL: https://github.com/apache/ignite-3/pull/7019#discussion_r2580205776
########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/StoragePartitionMetaIoV2.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory; + +import static org.apache.ignite.internal.pagememory.util.PageUtils.getLong; +import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong; + +import org.apache.ignite.internal.lang.IgniteStringBuilder; +import org.apache.ignite.internal.pagememory.util.PageIdUtils; +import org.apache.ignite.internal.pagememory.util.PartitionlessLinks; + +/** + * Storage Io for partition metadata pages (version 2). + */ +public class StoragePartitionMetaIoV2 extends StoragePartitionMetaIo { + private static final int WI_HEAD_OFF = ESTIMATED_SIZE_OFF + PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES; + + /** + * Constructor. + */ + protected StoragePartitionMetaIoV2() { + super(2); + } + + /** {@inheritDoc} */ Review Comment: ```suggestion ``` ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteLinkingWiInvokeClosure.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; + +import java.util.UUID; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.pagememory.freelist.FreeList; +import org.apache.ignite.internal.pagememory.tree.BplusTree; +import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.jetbrains.annotations.Nullable; + +/** + * Implementation of {@link InvokeClosure} for {@link AbstractPageMemoryMvPartitionStorage#addWrite(RowId, BinaryRow, UUID, int, int)} + * which additionally maintains links between write intents. + * + * <p>See {@link AbstractPageMemoryMvPartitionStorage} about synchronization. + * + * <p>Operation may throw {@link StorageException} which will cause form {@link BplusTree#invoke(Object, Object, InvokeClosure)}. + */ +class AddWriteLinkingWiInvokeClosure extends AddWriteInvokeClosure { + private final PersistentPageMemoryMvPartitionStorage persistentStorage; + + private final FreeList freeList; + + private long wiListHeadLink; + private long newWiListHeadLink; + + AddWriteLinkingWiInvokeClosure( + RowId rowId, + @Nullable BinaryRow row, + UUID txId, + int commitZoneId, + int commitPartitionId, + PersistentPageMemoryMvPartitionStorage storage + ) { + super(rowId, row, txId, commitZoneId, commitPartitionId, storage); + + persistentStorage = storage; + + freeList = storage.renewableState.freeList(); + } + + @Override + public void call(@Nullable VersionChain oldRow) throws IgniteInternalCheckedException { + wiListHeadLink = persistentStorage.lockWriteIntentListHead(); + newWiListHeadLink = wiListHeadLink; + + try { + super.call(oldRow); + } finally { + persistentStorage.updateWriteIntentListHeadAndUnlock(newWiListHeadLink); + } + } + + @Override + protected RowVersion insertFirstRowVersion() { + assert persistentStorage.writeIntentHeadIsLockedByCurrentThread(); + assert persistentStorage.writeIntentListHead() == wiListHeadLink + : "Expected WI list head link " + wiListHeadLink + " but was " + persistentStorage.writeIntentListHead(); + + WiLinkableRowVersion newVersion = insertRowVersion(NULL_LINK, wiListHeadLink, NULL_LINK); + + newWiListHeadLink = newVersion.link(); + + updateWiListLinks(newVersion); + + return newVersion; + } + + @Override + protected RowVersion insertAnotherRowVersion(VersionChain oldRow, @Nullable RowVersion existingWriteIntent) { + assert persistentStorage.writeIntentHeadIsLockedByCurrentThread(); + assert persistentStorage.writeIntentListHead() == wiListHeadLink + : "Expected WI list head link " + wiListHeadLink + " but was " + persistentStorage.writeIntentListHead(); + + boolean replacingExistingWriteIntent = oldRow.isUncommitted(); + assert replacingExistingWriteIntent == (existingWriteIntent != null); + + long newNextWiLink; + long newPrevWiLink; + if (replacingExistingWriteIntent) { + newNextWiLink = existingWriteIntent.operations().nextWriteIntentLink(wiListHeadLink); + newPrevWiLink = existingWriteIntent.operations().prevWriteIntentLink(); + } else { + newNextWiLink = wiListHeadLink; + newPrevWiLink = NULL_LINK; + } + + WiLinkableRowVersion newVersion = insertRowVersion(oldRow.newestCommittedLink(), newNextWiLink, newPrevWiLink); + + if (!replacingExistingWriteIntent) { + // Add our new version to the head of the WI list. + newWiListHeadLink = newVersion.link(); + } + + updateWiListLinks(newVersion); + + return newVersion; + } + + private WiLinkableRowVersion insertRowVersion(long nextLink, long nextWiLink, long prevWiLink) { + var rowVersion = new WiLinkableRowVersion(rowId, storage.partitionId, nextLink, nextWiLink, prevWiLink, row); + + storage.insertRowVersion(rowVersion); + + return rowVersion; + } + + private void updateWiListLinks(WiLinkableRowVersion newRowVersion) { + if (newRowVersion != null) { + if (newRowVersion.prevWriteIntentLink() != NULL_LINK) { + try { + freeList.updateDataRow(newRowVersion.prevWriteIntentLink(), UpdateNextWiLinkHandler.INSTANCE, newRowVersion.link()); Review Comment: Let's have a TODO that would ask us to move the `updateDataRow` from `freeList` to somewhere else. It's just a data page update, it doesn't have to go through a free-list. Are you OK with such a proposal? ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/WiLinkableRowVersion.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; +import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong; +import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES; +import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.writePartitionless; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.pagememory.util.PartitionlessLinks; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.tostring.S; +import org.jetbrains.annotations.Nullable; + +/** + * {@link RowVersion} extension which allows the represented write intent to be included in the write intents list. + */ +public final class WiLinkableRowVersion extends RowVersion { + public static final byte DATA_TYPE = 2; + + private static final int WRITE_INTENT_LINKS_SIZE_BYTES = 2 * Long.BYTES + 2 * PARTITIONLESS_LINK_SIZE_BYTES; Review Comment: ```suggestion /** A single UUID (16 bytes) and two partitionless links (6 bytes each). */ private static final int WRITE_INTENT_LINKS_SIZE_BYTES = 2 * Long.BYTES + 2 * PARTITIONLESS_LINK_SIZE_BYTES; ``` ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PlainRowVersionOperations.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; +import static org.apache.ignite.internal.util.GridUnsafe.pageSize; + +import java.util.function.Supplier; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.pagememory.io.DataPageIo; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.util.PageHandler; + +class PlainRowVersionOperations implements RowVersionOperations { + static final PlainRowVersionOperations INSTANCE = new PlainRowVersionOperations(); + + private PlainRowVersionOperations() { + // No-op. + } + + @Override + public void removeFromWriteIntentsList( + AbstractPageMemoryMvPartitionStorage storage, + Supplier<String> operationInfoSupplier + ) { + // No-op as plain row versions are not included in the write intents list. + } + + @Override + public long nextWriteIntentLink(long defaultLink) { Review Comment: I find the `defaultLink` naming confusing in this context, maybe we should add some small comment ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteLinkingWiInvokeClosure.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; + +import java.util.UUID; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.pagememory.freelist.FreeList; +import org.apache.ignite.internal.pagememory.tree.BplusTree; +import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.StorageException; +import org.jetbrains.annotations.Nullable; + +/** + * Implementation of {@link InvokeClosure} for {@link AbstractPageMemoryMvPartitionStorage#addWrite(RowId, BinaryRow, UUID, int, int)} + * which additionally maintains links between write intents. + * + * <p>See {@link AbstractPageMemoryMvPartitionStorage} about synchronization. + * + * <p>Operation may throw {@link StorageException} which will cause form {@link BplusTree#invoke(Object, Object, InvokeClosure)}. + */ +class AddWriteLinkingWiInvokeClosure extends AddWriteInvokeClosure { + private final PersistentPageMemoryMvPartitionStorage persistentStorage; + + private final FreeList freeList; + + private long wiListHeadLink; + private long newWiListHeadLink; + + AddWriteLinkingWiInvokeClosure( + RowId rowId, + @Nullable BinaryRow row, + UUID txId, + int commitZoneId, + int commitPartitionId, + PersistentPageMemoryMvPartitionStorage storage + ) { + super(rowId, row, txId, commitZoneId, commitPartitionId, storage); + + persistentStorage = storage; + + freeList = storage.renewableState.freeList(); + } + + @Override + public void call(@Nullable VersionChain oldRow) throws IgniteInternalCheckedException { + wiListHeadLink = persistentStorage.lockWriteIntentListHead(); + newWiListHeadLink = wiListHeadLink; + + try { + super.call(oldRow); + } finally { + persistentStorage.updateWriteIntentListHeadAndUnlock(newWiListHeadLink); + } + } + + @Override + protected RowVersion insertFirstRowVersion() { + assert persistentStorage.writeIntentHeadIsLockedByCurrentThread(); + assert persistentStorage.writeIntentListHead() == wiListHeadLink + : "Expected WI list head link " + wiListHeadLink + " but was " + persistentStorage.writeIntentListHead(); + + WiLinkableRowVersion newVersion = insertRowVersion(NULL_LINK, wiListHeadLink, NULL_LINK); + + newWiListHeadLink = newVersion.link(); + + updateWiListLinks(newVersion); + + return newVersion; + } + + @Override + protected RowVersion insertAnotherRowVersion(VersionChain oldRow, @Nullable RowVersion existingWriteIntent) { + assert persistentStorage.writeIntentHeadIsLockedByCurrentThread(); + assert persistentStorage.writeIntentListHead() == wiListHeadLink + : "Expected WI list head link " + wiListHeadLink + " but was " + persistentStorage.writeIntentListHead(); + + boolean replacingExistingWriteIntent = oldRow.isUncommitted(); + assert replacingExistingWriteIntent == (existingWriteIntent != null); + + long newNextWiLink; + long newPrevWiLink; + if (replacingExistingWriteIntent) { + newNextWiLink = existingWriteIntent.operations().nextWriteIntentLink(wiListHeadLink); + newPrevWiLink = existingWriteIntent.operations().prevWriteIntentLink(); + } else { + newNextWiLink = wiListHeadLink; + newPrevWiLink = NULL_LINK; + } + + WiLinkableRowVersion newVersion = insertRowVersion(oldRow.newestCommittedLink(), newNextWiLink, newPrevWiLink); Review Comment: What does this method do? If it actually inserts an entire row's payload into data pages then I would not recommend to do it while holding an exclusive lock. This will lead to potential contention. Operations under lock should be as trivial as possible. What if we insert the row with `NULL/NULL` first, and only then acquire lock and update its links? How many changes would that introduce to the code? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java: ########## @@ -100,6 +104,35 @@ public int partitionId() { return partitionId; } + /** + * Starts the handler doing necessary recovery. + */ + public void start() { + recoverPendingRows(); + } + + private void recoverPendingRows() { + LOG.info("Recovering pending rows [tableId={}, partitionIndex={}]", storage.tableId(), storage.partitionId()); + + int count = 0; + try (Cursor<RowId> writeIntentRowIds = storage.getStorage().scanWriteIntents()) { + for (RowId rowId : writeIntentRowIds) { + ReadResult result = storage.getStorage().read(rowId, HybridTimestamp.MAX_VALUE); + + if (!result.isEmpty() && result.isWriteIntent()) { Review Comment: Are we skipping tombstones? Why? Please remove the `isEmpty` check, thank you! ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java: ########## @@ -586,4 +673,43 @@ public boolean shouldRelease() { return checkpointTimeoutLock.shouldReleaseReadLock(); } } + + private class WriteIntentsCursor implements Cursor<RowId> { + private final long headLink; + private long nextLink; + + private WriteIntentsCursor(long headLink) { + this.headLink = headLink; + nextLink = headLink; + } + + @Override + public boolean hasNext() { + return busy(() -> nextLink != NULL_LINK); + } + + @SuppressWarnings("IteratorNextCanNotThrowNoSuchElementException") + @Override + public RowId next() { + return busy(() -> { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + RowVersion rowVersion = runConsistently(locker -> readRowVersion(nextLink, DONT_LOAD_VALUE)); Review Comment: Reading does not require `runConsistently`, as far as I remember ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/WiLinkableRowVersion.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; +import static org.apache.ignite.internal.pagememory.util.PageUtils.putLong; +import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.PARTITIONLESS_LINK_SIZE_BYTES; +import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.writePartitionless; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.pagememory.util.PartitionlessLinks; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.tostring.S; +import org.jetbrains.annotations.Nullable; + +/** + * {@link RowVersion} extension which allows the represented write intent to be included in the write intents list. + */ +public final class WiLinkableRowVersion extends RowVersion { + public static final byte DATA_TYPE = 2; + + private static final int WRITE_INTENT_LINKS_SIZE_BYTES = 2 * Long.BYTES + 2 * PARTITIONLESS_LINK_SIZE_BYTES; + + // Write intents list. + public static final int ROW_ID_MSB_OFFSET = SCHEMA_VERSION_OFFSET + SCHEMA_VERSION_SIZE_BYTES; + public static final int ROW_ID_LSB_OFFSET = ROW_ID_MSB_OFFSET + Long.BYTES; + public static final int NEXT_WRITE_INTENT_LINK_OFFSET = ROW_ID_LSB_OFFSET + Long.BYTES; + public static final int PREV_WRITE_INTENT_LINK_OFFSET = NEXT_WRITE_INTENT_LINK_OFFSET + PARTITIONLESS_LINK_SIZE_BYTES; + + public static final int VALUE_OFFSET = PREV_WRITE_INTENT_LINK_OFFSET + PARTITIONLESS_LINK_SIZE_BYTES; + + private final RowId rowId; + + private final long nextWriteIntentLink; + private final long prevWriteIntentLink; + + /** + * Constructor. + */ + public WiLinkableRowVersion( + RowId rowId, + int partitionId, + long nextLink, + long nextWriteIntentLink, + long prevWriteIntentLink, + @Nullable BinaryRow value + ) { + this( + rowId, + partitionId, + NULL_LINK, + null, + nextLink, + nextWriteIntentLink, + prevWriteIntentLink, + value == null ? 0 : value.tupleSliceLength(), + value + ); + } + + /** + * Constructor. + */ + public WiLinkableRowVersion( + RowId rowId, + int partitionId, + long link, + @Nullable HybridTimestamp commitTimestamp, + long nextLink, + long nextWriteIntentLink, + long prevWriteIntentLink, + int valueSize + ) { + this(rowId, partitionId, link, commitTimestamp, nextLink, nextWriteIntentLink, prevWriteIntentLink, valueSize, null); + } + + /** + * Constructor. + */ + public WiLinkableRowVersion( + RowId rowId, + int partitionId, + long link, + @Nullable HybridTimestamp timestamp, + long nextLink, + long nextWriteIntentLink, + long prevWriteIntentLink, + int valueSize, + @Nullable BinaryRow value + ) { + super(partitionId, link, timestamp, nextLink, valueSize, value); + + this.rowId = rowId; + this.nextWriteIntentLink = nextWriteIntentLink; + this.prevWriteIntentLink = prevWriteIntentLink; + } + + public RowId rowId() { + return rowId; + } + + public long nextWriteIntentLink() { + return nextWriteIntentLink; + } + + public long prevWriteIntentLink() { + return prevWriteIntentLink; + } + + @Override + public int headerSize() { + return super.headerSize() + WRITE_INTENT_LINKS_SIZE_BYTES; + } + + @Override + protected byte dataType() { + return DATA_TYPE; + } + + @Override + protected int valueOffset() { + return VALUE_OFFSET; + } + + @Override + protected void writeHeader(long pageAddr, int dataOff) { + super.writeHeader(pageAddr, dataOff); + + putLong(pageAddr, dataOff + ROW_ID_MSB_OFFSET, rowId.mostSignificantBits()); + putLong(pageAddr, dataOff + ROW_ID_LSB_OFFSET, rowId.leastSignificantBits()); + writePartitionless(pageAddr + dataOff + NEXT_WRITE_INTENT_LINK_OFFSET, nextWriteIntentLink); + writePartitionless(pageAddr + dataOff + PREV_WRITE_INTENT_LINK_OFFSET, prevWriteIntentLink); + } + + @Override + protected void writeHeader(ByteBuffer pageBuf) { + super.writeHeader(pageBuf); + + pageBuf.putLong(rowId.mostSignificantBits()); + pageBuf.putLong(rowId.leastSignificantBits()); + PartitionlessLinks.writeToBuffer(pageBuf, nextWriteIntentLink); + PartitionlessLinks.writeToBuffer(pageBuf, prevWriteIntentLink); + } + + @Override + RowVersionOperations operations() { + return new WiLinkableRowVersionOperations(this); + } + + @Override + public String toString() { + return S.toString(WiLinkableRowVersion.class, this); Review Comment: There's a chance that it won't print the content of a `super` instance. Please be aware of it and check the real result ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java: ########## @@ -100,6 +104,35 @@ public int partitionId() { return partitionId; } + /** + * Starts the handler doing necessary recovery. + */ + public void start() { + recoverPendingRows(); + } + + private void recoverPendingRows() { + LOG.info("Recovering pending rows [tableId={}, partitionIndex={}]", storage.tableId(), storage.partitionId()); + + int count = 0; + try (Cursor<RowId> writeIntentRowIds = storage.getStorage().scanWriteIntents()) { + for (RowId rowId : writeIntentRowIds) { + ReadResult result = storage.getStorage().read(rowId, HybridTimestamp.MAX_VALUE); + + if (!result.isEmpty() && result.isWriteIntent()) { + UUID txId = result.transactionId(); + assert txId != null : "Transaction ID is null for a write intent [rowId=" + rowId + "]"; + + pendingRows.addPendingRowId(txId, rowId); + } + + count++; + } + } + + LOG.info("Recovered pending rows [tableId={}, partitionIndex={}, count={}]", storage.tableId(), storage.partitionId(), count); Review Comment: Let's also log the time it took, I would choose a millisecond precision (but still use a nanotime for tracking). ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbortWriteInvokeClosure.java: ########## @@ -110,6 +110,13 @@ public OperationType operationType() { return operationType; } + @Override + public void onUpdate() { + if (toRemove != null) { + toRemove.operations().removeFromWriteIntentsList(storage, this::abortWriteInfo); Review Comment: If `AbortWrite` looks so clean, then why does the `AddWrite` has so much code in it? Are we using different approaches in these two closures? ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/WriteIntentListSupport.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; + +import java.util.function.Supplier; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.pagememory.freelist.FreeListImpl; +import org.apache.ignite.internal.storage.StorageException; + +class WriteIntentListSupport { + static void removeNodeFromWriteIntentsList( + WiLinkableRowVersion rowVersionToRemove, + PersistentPageMemoryMvPartitionStorage storage, + Supplier<String> operationInfoSupplier + ) { + FreeListImpl freeList = storage.renewableState.freeList(); + + long wiListHeadLink = storage.lockWriteIntentListHead(); + + try { + long nextWiNodeLink; + if (rowVersionToRemove.nextWriteIntentLink() == NULL_LINK) { + nextWiNodeLink = NULL_LINK; + } else { + freeList.updateDataRow( + rowVersionToRemove.nextWriteIntentLink(), + UpdatePrevWiLinkHandler.INSTANCE, + rowVersionToRemove.prevWriteIntentLink() + ); + + nextWiNodeLink = rowVersionToRemove.nextWriteIntentLink(); + } + + if (rowVersionToRemove.prevWriteIntentLink() != NULL_LINK) { + freeList.updateDataRow( + rowVersionToRemove.prevWriteIntentLink(), + UpdateNextWiLinkHandler.INSTANCE, + rowVersionToRemove.nextWriteIntentLink() + ); + } + + if (rowVersionToRemove.prevWriteIntentLink() == NULL_LINK) { Review Comment: ```suggestion } else { ``` ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java: ########## @@ -100,6 +104,35 @@ public int partitionId() { return partitionId; } + /** + * Starts the handler doing necessary recovery. + */ + public void start() { + recoverPendingRows(); + } + + private void recoverPendingRows() { + LOG.info("Recovering pending rows [tableId={}, partitionIndex={}]", storage.tableId(), storage.partitionId()); + + int count = 0; + try (Cursor<RowId> writeIntentRowIds = storage.getStorage().scanWriteIntents()) { + for (RowId rowId : writeIntentRowIds) { + ReadResult result = storage.getStorage().read(rowId, HybridTimestamp.MAX_VALUE); Review Comment: Let's add a TODO for finally introducing `ReadOptions` into the API. Why? Because here we don't actually need a value, all we need it a header. Otherwise this code will become very inefficient, values can span multiple data pages. I'm obviously not asking you to do this optimization right now, but in the future - we have to do it. ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/WriteIntentListSupport.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; + +import java.util.function.Supplier; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.pagememory.freelist.FreeListImpl; +import org.apache.ignite.internal.storage.StorageException; + +class WriteIntentListSupport { + static void removeNodeFromWriteIntentsList( Review Comment: First of all, some "pictures" would definitely help for these algorithms. Second, I see that you update previous page and next page, but you don't update "current" page, which makes this all operation inconsistent. Could you please explain what I'm missing? ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/WiLinkableRowVersionOperations.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.pagememory.mv; + +import static org.apache.ignite.internal.pagememory.util.PageIdUtils.NULL_LINK; +import static org.apache.ignite.internal.pagememory.util.PartitionlessLinks.writePartitionless; +import static org.apache.ignite.internal.storage.pagememory.mv.WriteIntentListSupport.removeNodeFromWriteIntentsList; +import static org.apache.ignite.internal.util.GridUnsafe.pageSize; + +import java.util.function.Supplier; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.pagememory.io.DataPageIo; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.util.PageHandler; + +class WiLinkableRowVersionOperations implements RowVersionOperations { + private final WiLinkableRowVersion rowVersion; + + WiLinkableRowVersionOperations(WiLinkableRowVersion rowVersion) { + this.rowVersion = rowVersion; + } + + @Override + public void removeFromWriteIntentsList( + AbstractPageMemoryMvPartitionStorage storage, + Supplier<String> operationInfoSupplier + ) { + assert storage instanceof PersistentPageMemoryMvPartitionStorage; + + removeNodeFromWriteIntentsList(rowVersion, (PersistentPageMemoryMvPartitionStorage) storage, operationInfoSupplier); + } + + @Override + public long nextWriteIntentLink(long defaultLink) { + return rowVersion.nextWriteIntentLink(); + } + + @Override + public long prevWriteIntentLink() { + return rowVersion.prevWriteIntentLink(); + } + + @Override + public PageHandler<HybridTimestamp, Object> converterToCommittedVersion() { + return UpdateTimestampAndZeroWiLinksHandler.INSTANCE; + } + + private static class UpdateTimestampAndZeroWiLinksHandler implements PageHandler<HybridTimestamp, Object> { + private static final UpdateTimestampAndZeroWiLinksHandler INSTANCE = new UpdateTimestampAndZeroWiLinksHandler(); + + @Override + public Object run( + int groupId, + long pageId, + long page, + long pageAddr, + PageIo io, + HybridTimestamp timestamp, + int itemId + ) { + DataPageIo dataIo = (DataPageIo) io; + + int payloadOffset = dataIo.getPayloadOffset(pageAddr, itemId, pageSize(), 0); + + HybridTimestamps.writeTimestampToMemory(pageAddr, payloadOffset + RowVersion.TIMESTAMP_OFFSET, timestamp); Review Comment: Do we nullify a `RowId` part? A wild thought: `RowId` and `Timestamp` may actually use the same physical space, it'll prevent us from wasting 8 bytes. Is this too radical? Or is this already the case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
