stable version: https://lists.cpunks.org/pipermail/cypherpunks/2022-July/102232.html
i find it cool how i found a solution here of placing asserts practically every line. i'm thinking of how that would help stability of things in general, and with some work maybe other logical projects.
# block tree structure # blocks written to are leaves # depth is log2 leaves # when a flush is made, all blocks are written, and also enough nodes such that every leaf can be accessed within depth lookups. # consider we have an existing tree # with say m flushes, containing n leaves (or m leaves). we'll likely call it n. # each flush shows which leaves it has # additionally, with the final flush, each leaf has an existing depth. # when we reflush, we need to provide a new index for any leaves that become too deep. # which leaves are too deep? # we could basically walk them all to find out. this would be a consistent first approach. import math class Simple: class Chunk: def __init__(self, offset, data, end=None): self.start = offset self.data = data self.end = self.start + len(self.data) if end is None else end class Flush: # flush has a list of new leaves, and a list of indexes to old leaves with ranges def __init__(self, *writes, prev_flush=None, verify:bytes=None): self.prev_flush = prev_flush self.data = writes # find extents start = min((write.start for write in self.data)) end = max((write.end for write in self.data)) if prev_flush is None: self.start = start self.end = end self.index = [] return self.start = min(start, prev_flush.start) self.end = max(end, prev_flush.end) self.index = [Simple.Chunk(prev_flush.start, prev_flush)] # find leaf count and leaf depths # => one path that could show optimizations to avoid the O(n) choices -> start by iterating prev_flush's leaves rather than these leaves = [*self.leaves()] max_depth = len(leaves).bit_length() self.index = [] depth = 0 shared_parents = None for leaf_path, leaf_offset, leaf_data in leaves: if verify is not None: assert verify[leaf_offset:leaf_offset+len(leaf_data)] == leaf_data leaf_end = leaf_offset + len(leaf_data) if not leaf_path: print('skipping', leaf_offset, leaf_end) # references data that doesn't need indexing here, skip by it if len(self.index): self.index[-1].end = leaf_end continue if len(self.index) and len(leaf_path) > depth and leaf_path[depth].data is self.index[-1].data: # the data happens to be in the preceding index if len(leaf_path) - depth < len(shared_parents) + max_depth: print('extending', self.index[-1].start, leaf_end) # parents are shared enough to not reach the max depth if the preceding index is merged new_depth = max(depth, len(leaf_path) - max_depth) if new_depth != depth: shared_parents = shared_parents[new_depth-depth:] old_index_entry = self.index[-1] new_index_entry = Simple.Chunk(self.index[-1].start, shared_parents[0].data, leaf_end) self.index[-1] = new_index_entry if verify is not None: for verify_path, verify_offset, verify_data in self.leaves(new_index_entry.start, new_index_entry.end): assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data depth = new_depth else: self.index[-1].end = leaf_end if verify is not None: for verify_path, verify_offset, verify_data in self.leaves(self.index[-1].start, self.index[-1].end): assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data shared_parents = [shared for shared, parent in zip(shared_parents, leaf_path[depth:]) if shared is parent] assert len(shared_parents) continue # otherwise, make a new index entry if len(self.index): if verify is not None: for verify_path, verify_offset, verify_data in self.leaves(self.index[-1].start, self.index[-1].end): assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data # first make the last index more shallow based on shared_parents if shared_parents[-1].data is not self.index[-1].data: print('shallowing', self.index[-1].start, self.index[-1].end) old_index_entry = self.index[-1] new_index_entry = Simple.Chunk(self.index[-1].start, shared_parents[-1].data, self.index[-1].end) self.index[-1] = new_index_entry if verify is not None: for verify_path, verify_offset, verify_data in self.leaves(self.index[-1].start, self.index[-1].end): assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data print('index', leaf_offset, leaf_end) # initialise new index depth = max(0, len(leaf_path) - max_depth) shared_parents = leaf_path[depth:] shallow_index = leaf_path[depth] new_index_entry = Simple.Chunk(leaf_offset, shallow_index.data, leaf_end) # there may be some duplicate information storage regarding starts and ends, unsure. note iterating leaves gives separate starts and ends. if verify is not None: for verify_path, verify_offset, verify_data in new_index_entry.data.leaves(new_index_entry.start, new_index_entry.end): assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data self.index.append(new_index_entry) if verify is not None: for verify_path, verify_offset, verify_data in self.index[-1].data.leaves(self.index[-1].start, self.index[-1].end): assert verify[verify_offset:verify_offset+len(verify_data)] == verify_data continue print('indexes', self.index) def __len__(self): return self.end - self.start #def lookup(self, offset def leaves(self, start = None, end = None): if start is None: start = self.start if end is None: end = self.end print('leaves', self, start, end) offset = start data_iter = iter(self.data) index_iter = iter(self.index) next_write = next(data_iter, None) next_index = next(index_iter, None) while offset < end: if next_write is not None and offset >= next_write.start: if offset < next_write.end: # offset >= next_write # so we look in the write substart = offset - next_write.start subend = min(end, next_write.end) - next_write.start assert subend >= substart print('yielding', offset, subend + next_write.start) yield ([], offset, next_write.data[substart:subend]) offset += subend - substart assert offset <= end next_write = next(data_iter, None) else: # offset < next_write # so we look in the index assert next_index is not None subend = min(next_write.start, end) if next_write is not None else end while offset >= next_index.end: next_index = next(index_iter) assert offset >= next_index.start and offset < next_index.end subend = min(subend, next_index.end) assert subend <= end #yield from next_index.data.leaves(offset, subend, depth + 1) for path, offset, data in next_index.data.leaves(offset, subend): print('pathing', len(path)+1, offset, offset+len(data)) yield [next_index, *path], offset, data offset = subend assert offset <= end if end == self.end: assert next(index_iter, None) is None def __init__(self, latest = None): self.tip = latest self.pending = [] def write(self, offset, data): print('write', offset, offset+len(data)) for idx, pending in [*enumerate(self.pending)]: if pending.start <= offset + len(data) and pending.end >= offset: # merge pending with data # then merge pending with neighbors if offset > pending.start: merged_data = pending.data[:offset - pending.start] + data else: merged_data = data if offset + len(data) < pending.end: merged_data = merged_data + pending.data[offset + len(data) - pending.start:] # we could maybe merge 'merged' via recursion # basically we'd excise pending, and then write merged self.pending.pop(idx) return self.write(min(offset, pending.start), merged_data) elif pending.start > offset + len(data): # passed this data without overlap return self.pending.insert(idx, self.Chunk(offset, data)) self.pending.append(self.Chunk(offset, data)) def flush(self, verify:bytes=None): self.tip = self.Flush(*self.pending, prev_flush=self.tip, verify=verify) self.pending = [] def leaves(self, start = None, end = None): if self.tip is not None: return self.tip.leaves(start, end) if __name__ == '__main__': import random SIZE=4096 random.seed(1) # found min flush 3 thru 70 store = Simple() comparison = bytearray(SIZE) store.write(0, bytes(SIZE)) for flushes in range(1024): for writes in range(1):#range(random.randint(1,16)): start = random.randint(0, SIZE) end = random.randint(0, SIZE) start, end = (start, end) if start <= end else (end, start) data = random.randbytes(end - start) comparison[start:end] = data store.write(start, data) for pending in store.pending: assert comparison[pending.start:pending.end] == pending.data store.flush(verify=comparison) last_offset = 0 max_depth = 0 for path, offset, data in store.leaves(): assert offset >= last_offset assert comparison[last_offset:offset] == bytes(offset - last_offset) last_offset = offset + len(data) assert comparison[offset:last_offset] == data max_depth = max(len(path), max_depth) assert comparison[last_offset:] == bytes(len(comparison) - last_offset) print(flushes, max_depth, 'OK')