sashapolo commented on code in PR #1887:
URL: https://github.com/apache/ignite-3/pull/1887#discussion_r1159451311
##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java:
##########
@@ -77,159 +67,89 @@ public void onRead(Iterator<CommandClosure<ReadCommand>>
iter) {
ReadCommand command = clo.command();
- if (command instanceof GetCommand) {
- GetCommand getCmd = (GetCommand) command;
-
- Entry e;
-
- if (getCmd.revision() != 0) {
- e = storage.get(getCmd.key(), getCmd.revision());
- } else {
- e = storage.get(getCmd.key());
- }
-
- clo.result(e);
- } else if (command instanceof GetAllCommand) {
- GetAllCommand getAllCmd = (GetAllCommand) command;
-
- Collection<Entry> entries;
-
- if (getAllCmd.revision() != 0) {
- entries = storage.getAll(getAllCmd.keys(),
getAllCmd.revision());
- } else {
- entries = storage.getAll(getAllCmd.keys());
- }
-
- clo.result((Serializable) entries);
- } else {
- assert false : "Command was not found [cmd=" + command + ']';
- }
- }
- }
-
- @Override
- public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
- while (iter.hasNext()) {
- CommandClosure<WriteCommand> clo = iter.next();
-
- if (writeHandler.handleWriteCommand(clo)) {
- continue;
- }
-
- WriteCommand command = clo.command();
-
- if (command instanceof CreateRangeCursorCommand) {
- var rangeCmd = (CreateRangeCursorCommand) command;
-
- IgniteUuid cursorId = rangeCmd.cursorId();
-
- Cursor<Entry> cursor = rangeCmd.revUpperBound() != -1
- ? storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(),
rangeCmd.revUpperBound(), rangeCmd.includeTombstones())
- : storage.range(rangeCmd.keyFrom(), rangeCmd.keyTo(),
rangeCmd.includeTombstones());
-
- var cursorMeta = new CursorMeta(cursor,
rangeCmd.requesterNodeId());
-
- cursors.put(cursorId, cursorMeta);
-
- clo.result(cursorId);
- } else if (command instanceof CreatePrefixCursorCommand) {
- var prefixCmd = (CreatePrefixCursorCommand) command;
-
- IgniteUuid cursorId = prefixCmd.cursorId();
-
- Cursor<Entry> cursor = prefixCmd.revUpperBound() == -1
- ? storage.prefix(prefixCmd.prefix(),
prefixCmd.includeTombstones())
- : storage.prefix(prefixCmd.prefix(),
prefixCmd.revUpperBound(), prefixCmd.includeTombstones());
-
- var cursorMeta = new CursorMeta(cursor,
prefixCmd.requesterNodeId());
-
- cursors.put(cursorId, cursorMeta);
+ try {
+ if (command instanceof GetCommand) {
+ GetCommand getCmd = (GetCommand) command;
- clo.result(cursorId);
- } else if (command instanceof NextBatchCommand) {
- var nextBatchCommand = (NextBatchCommand) command;
+ Entry e;
- CursorMeta cursorMeta =
cursors.get(nextBatchCommand.cursorId());
+ if (getCmd.revision() != 0) {
+ e = storage.get(getCmd.key(), getCmd.revision());
+ } else {
+ e = storage.get(getCmd.key());
+ }
- if (cursorMeta == null) {
- clo.result(new NoSuchElementException("Corresponding
cursor on the server side is not found."));
+ clo.result(e);
+ } else if (command instanceof GetAllCommand) {
+ GetAllCommand getAllCmd = (GetAllCommand) command;
- return;
- }
+ Collection<Entry> entries;
- try {
- var resp = new
ArrayList<Entry>(nextBatchCommand.batchSize());
+ if (getAllCmd.revision() != 0) {
+ entries = storage.getAll(getAllCmd.keys(),
getAllCmd.revision());
+ } else {
+ entries = storage.getAll(getAllCmd.keys());
+ }
- Cursor<Entry> cursor = cursorMeta.cursor();
+ clo.result((Serializable) entries);
+ } else if (command instanceof GetRangeCommand) {
+ var rangeCmd = (GetRangeCommand) command;
- for (int i = 0; i < nextBatchCommand.batchSize() &&
cursor.hasNext(); i++) {
- resp.add(cursor.next());
- }
+ byte[] keyFrom = rangeCmd.previousKey() == null
+ ? rangeCmd.keyFrom()
+ :
requireNonNull(storage.nextKey(rangeCmd.previousKey()));
- if (!cursor.hasNext()) {
- closeCursor(nextBatchCommand.cursorId());
- }
+ clo.result(handlePaginationCommand(keyFrom,
rangeCmd.keyTo(), rangeCmd));
+ } else if (command instanceof GetPrefixCommand) {
+ var prefixCmd = (GetPrefixCommand) command;
- clo.result(new BatchResponse(resp, cursor.hasNext()));
- } catch (Exception e) {
- clo.result(e);
- }
- } else if (command instanceof CloseCursorCommand) {
- var closeCursorCommand = (CloseCursorCommand) command;
+ byte[] keyFrom = prefixCmd.previousKey() == null
+ ? prefixCmd.prefix()
+ :
requireNonNull(storage.nextKey(prefixCmd.previousKey()));
- try {
- closeCursor(closeCursorCommand.cursorId());
+ byte[] keyTo = storage.nextKey(prefixCmd.prefix());
- clo.result(null);
- } catch (Exception e) {
- clo.result(new MetaStorageException(CURSOR_CLOSING_ERR,
e));
+ clo.result(handlePaginationCommand(keyFrom, keyTo,
prefixCmd));
+ } else {
+ assert false : "Command was not found [cmd=" + command +
']';
}
- } else if (command instanceof CloseAllCursorsCommand) {
- var cursorsCloseCmd = (CloseAllCursorsCommand) command;
-
- Iterator<CursorMeta> cursorsIter = cursors.values().iterator();
+ } catch (Exception e) {
+ clo.result(e);
+ }
+ }
+ }
- Exception ocurredException = null;
+ private BatchResponse handlePaginationCommand(byte[] keyFrom, byte
@Nullable [] keyTo, PaginationCommand command) {
+ Cursor<Entry> cursor = command.revUpperBound() == -1
+ ? storage.range(keyFrom, keyTo)
+ : storage.range(keyFrom, keyTo, command.revUpperBound());
- while (cursorsIter.hasNext()) {
- CursorMeta cursorDesc = cursorsIter.next();
+ try (cursor) {
+ var entries = new ArrayList<Entry>();
- if
(cursorDesc.requesterNodeId().equals(cursorsCloseCmd.nodeId())) {
- try {
- cursorDesc.cursor().close();
- } catch (Exception e) {
- if (ocurredException == null) {
- ocurredException = e;
- } else {
- ocurredException.addSuppressed(e);
- }
- }
+ for (Entry entry : cursor) {
+ if (command.includeTombstones() || !entry.tombstone()) {
+ entries.add(entry);
- cursorsIter.remove();
+ if (entries.size() == command.batchSize()) {
Review Comment:
wow, nice catch, I think it is what is actually happening. I'll add a test
for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]