This is what i'm thinking of using as a base so as to reuse work when
changing the structure to use index regions between writes. It makes large
and shallow indexes atm.
import bisect

class Chunk:
    def __init__(self, start, end, data, height=0, leaf_count=1, age=0):
        self.start = start
        self.end = end
        self.data = data
        self.height = height
        self.leaf_count = leaf_count
        self.age = age
    def __len__(self):
        return self.end - self.start
    def is_leaf(self):
        return self.height == 0
    def is_full(self):
        return self.leaf_count == (1 << self.height)
    def is_power_of_2(self):
        v = self.leaf_count
        return v & (v - 1) == 0

class Flush(Chunk):
    class Entry(Chunk):
        def __init__(self, start, end, chunk, path = [], height=0, leaf_count=1):
            super().__init__(start, end, chunk, height=height, leaf_count=leaf_count, age=chunk.age)
            self.path = list(path)
            self.path.append(self.data)
            self.single_child_descendents = []
            self.child_count = 0
            if not chunk.is_leaf() and (leaf_count==1 or height==0):
                assert leaf_count==1 and height==0
                # calculate leaf_count and height
                self.leaf_count = 0
                self.height = 1
                for entry in self.flush_entries():
                    self.leaf_count += entry.leaf_count
                    self.height = max(self.height, entry.height + 1)
                    self.child_count += 1
                    if entry.single_child_descendents:
                        self.single_child_descendents.extend(entry.single_child_descendents)
                    elif entry.child_count == 1:
                        self.single_child_descendents.append(entry)


        def flush_entries(self):
            assert not self.data.is_leaf()
            return (
                Flush.Entry(max(entry.start, self.start), min(entry.end, self.end), entry.data, self.path)
                for entry in self.data.data
                if entry.start < self.end and entry.end > self.start
            )
        def chunk_data(self):
            assert self.is_leaf()
            return self.data.data[self.start - self.data.start : self.end - self.data.start]
    def __init__(self, prev_flush = None):
        if prev_flush is not None:
            super().__init__(prev_flush.start, prev_flush.end, [], height=1, leaf_count=0, age=prev_flush.age+1)
            self.max_height = prev_flush.leaf_count.bit_length()
            prev_entry = Flush.Entry(self.start, self.end, prev_flush)
            self.add(prev_entry)
        else:
            super().__init__(None, None, [], height=1, leaf_count=0)
            self.max_height = 1
    def add(self, *adjacents):
        adjacents = list(adjacents)
        if self.start is None:
            self.start = adjacents[0].start
            self.end = adjacents[-1].end
        else:
            self.start = min(self.start, adjacents[0].start)
            self.end = max(self.end, adjacents[-1].end)


        # expand adjacents that are too deep [should go after start_idx and end_idx are adjust, to find correct max_height easier]
        idx = 0
                # could update this algorithm to: accumulate things that aren't leaves
                # after a run, calculate their shared max_height
                # then shallow them all to that minus one, and we're the new root
                # seems like the big update of interest
                    # change: now bringing up descendents without siblings
        while idx < len(adjacents):
            entry = adjacents[idx]
            if entry.single_child_descendents:
                print(f'merging {len(entry.single_child_descendents)} single children')
                subadjacents = []
                shallow_start = entry.start
                for descendent in entry.single_child_descendents:
                    if shallow_start != descendent.start:
                        subadjacents.append(Flush.Entry(shallow_start, descendent.start, entry.data))
                    subadjacents.append(descendent)
                    shallow_start = descendent.end
                if shallow_start != entry.end:
                    subadjacents.append(Flush.Entry(shallow_start, entry.end, entry.data))
                adjacents[idx:idx+1] = subadjacents
            #elif entry.height + 1 > self.max_height:
            #    print(f'expanding a branch with depth {entry.height+1}')
            #    subadjacents = []
            #    shallow_start = entry.start
            #    shallow_end = shallow_start
            #    for subentry in entry.flush_entries():
            #        if subentry.height + 2 > self.max_height:
            #            if shallow_end != shallow_start:
            #                subadjacents.append(Flush.Entry(shallow_start, shallow_end, entry.data))
            #            subadjacents.append(subentry)
            #            shallow_start = subentry.end
            #        shallow_end = subentry.end
            #    if shallow_end != shallow_start:
            #        subadjacents.append(Flush.Entry(shallow_start, shallow_end, entry.data))
            #    adjacents[idx:idx+1] = subadjacents
            else:
                idx += 1

        # first idx with end >= start
        start_idx = bisect.bisect_left([entry.end for entry in self.data], adjacents[0].start)
        # first idx with start > end
        end_idx = bisect.bisect_right([entry.start for entry in self.data], adjacents[-1].end, start_idx)
        replaced = self.data[start_idx:end_idx]
        if len(replaced):
            if replaced[0].start < adjacents[0].start:
                adjacents.insert(
                    0,
                    Flush.Entry(
                        replaced[0].start, adjacents[0].start, replaced[0].data
                    )
                )
                if start_idx > 0 and replaced[0].end > adjacents[1].start:
                    # the trimmed entry may have fewer leaves and itself merge with its neighbor
                    start_idx -= 1
                    replaced.insert(0, self.data[start_idx])
                    adjacents.insert(0, self.data[start_idx])
            if replaced[-1].end > adjacents[-1].end:
                adjacents.append(
                    Flush.Entry(
                        adjacents[-1].end, replaced[-1].end, replaced[-1].data
                    )
                )
                if end_idx < len(self.data) and replaced[-1].start < adjacents[-2].end:
                    # the trimmed entry may have fewer leaves and itself merge with its neighbor
                    replaced.append(self.data[end_idx])
                    adjacents.append(self.data[end_idx])
                    end_idx += 1

        for idx, entry in reversed(list(enumerate(adjacents))):
            if entry.leaf_count == 0:
                # no leaves left in this branch, remove
                adjacents.pop(idx)
                continue
            count = 0
            subentry = entry
            while count <= 1 and not subentry.data.is_leaf():
                # make branches shallower by splicing out roots with only one child
                parent_entry = subentry
                count = 0
                for subentry in parent_entry.flush_entries():
                    count += 1
                    if count > 1:
                        subentry = parent_entry
                        break
            if subentry is not entry:
                # some internodes were removed
                adjacents[idx] = subentry

        idx = len(adjacents) - 1
        while idx > 0:
            idx -= 1
            left_adjacent = adjacents[idx]
            right_adjacent = adjacents[idx+1]

            # merge writes
            if (
                left_adjacent.age == self.age and
                right_adjacent.age == self.age and
                left_adjacent.end == right_adjacent.start
            ):
                left_adjacent.data = Chunk(
                    left_adjacent.start,
                    right_adjacent.end,
                    left_adjacent.chunk_data() + right_adjacent.chunk_data(),
                    age = self.age
                )
                left_adjacent.end = right_adjacent.end
                adjacents.pop(idx+1)
                continue

            # merge branches with shared parents
            shared_path = [
                left_parent for left_parent, right_parent
                in zip(left_adjacent.path, right_adjacent.path)
                if left_parent is right_parent
            ]
            if len(shared_path) > 0 and left_adjacent.height + len(left_adjacent.path) - len(shared_path) < self.max_height and right_adjacent.height + len(right_adjacent.path) - len(shared_path) < self.max_height:
                if left_adjacent.end != right_adjacent.start:
                    assert left_adjacent.end < right_adjacent.start
                    between_entry = Flush.Entry(
                        left_adjacent.end,
                        right_adjacent.start,
                        shared_path[-1]
                    )
                    if between_entry.leaf_count > 0:
                        # the shared root contains leaves in between that have been removed
                        continue
                print(f'Merging {len(left_adjacent.path)}:{left_adjacent.height}, {len(right_adjacent.path)}:{right_adjacent.height} -> {len(shared_path)}:{left_adjacent.height + len(left_adjacent.path) - len(shared_path)}')
                merged = Flush.Entry(
                    left_adjacent.start,
                    right_adjacent.end,
                    chunk = shared_path[-1],
                    path = shared_path,
                    # letting Entry recalculate these is a quick way to handle overlap
                    #leaf_count = left_adjacent.leaf_count + right_adjacent.leaf_count,
                    #height = left_adjacent.height + len(left_adjacent.path) - len(shared_path)
                )
                #assert merged.leaf_count == merged.data.check_leaf_count(merged.start, merged.end)
                adjacents[idx:idx+2] = [merged]

        # ensure all leaves are balanced trees with appropriate height
        idx = 0
        while idx < len(adjacents):
            entry = adjacents[idx]
            if entry.height >= self.max_height or not entry.is_full():
                adjacents[idx:idx+1] = entry.flush_entries()
            #elif not entry.is_full():
            #    subadjacents = []
            #    last_idx = idx
            #    last_end = entry.start
            #    count = 0
            #    for next_idx, subentry in enumerate(entry.flush_entries()):
            #        subadjacents.append(subentry)
            #        next_count = count + subentry.leaf_count
            #        if is_2_power(next_count):
            #            subadjacents.append(
#
#                # - each entry is made of full entries
#                # - so we want as many entries together as are still full
#                    # why is it not doing that already
#                    # once it is a power of 2 large, it should be treated as full
            else:
                idx += 1


        self.data[start_idx:end_idx] = adjacents
        # using Entry to recalculate leaf_count is a quick-to-implement way to handle not double-counting chunks that span trimmed groups
        proxy_entry = Flush.Entry(self.data[0].start, self.data[-1].end, self)
        self.leaf_count = proxy_entry.leaf_count
        self.height = proxy_entry.height
        #self.height = max((entry.height for entry in self.data)) + 1
        #self.check_leaf_count(self.start, self.end)

        assert self.leaf_count > 0

        self.max_height = self.leaf_count.bit_length()

        #assert self.max_height >= self.height # oops this isn't met yet due to dependency order above, calculations used last value

    def write(self, offset, data):
        chunk = Chunk(offset, offset + len(data), data, age=self.age)
        entry = Flush.Entry(offset, offset + len(data), chunk)
        return self.add(entry)
    def read(self, start, max_end = float('inf')):
        # first idx with end > start
        idx = bisect.bisect_right([entry.end for entry in self.data], start)
        if idx == len(self.data):
            return bytes(4096)
        entry = self.data[idx]
        if entry.start > start:
            end = min(max_end, entry.start)
            return bytes(end - start)
        end = min(max_end, entry.end)
        if entry.data.is_leaf():
            datastart = start - entry.data.start
            dataend = end - entry.data.start
            return entry.data.data[datastart : dataend]
        else:
            return entry.data.read(start, end)
    def check_leaf_count(self, start, end):
        leaf_count = 0
        height = 1
        wrapper = Flush.Entry(start, end, self)
        for entry in wrapper.flush_entries():
            if type(entry.data) is Flush:
                entry_leaf_count = entry.data.check_leaf_count(entry.start, entry.end)
                assert entry_leaf_count == entry.leaf_count
                leaf_count += entry_leaf_count
            else:
                leaf_count += entry.data.leaf_count
            height = max(height, entry.height + 1)
        assert leaf_count == wrapper.leaf_count
        assert height == wrapper.height
        if start == self.start and end == self.end:
            assert leaf_count == self.leaf_count
            assert height == self.height # oops not met yet
        return leaf_count



def main():
    import random
    random.seed(0)
    SIZE=4096
    comparison = bytearray(SIZE)
    #import mmap
    #comparison = mmap.mmap(-1, SIZE)
    store = Flush()
    def compare(store, comparison):
        offset = 0
        while offset < len(comparison):
            data = store.read(offset)[:len(comparison) - offset]
            assert data == comparison[offset:offset+len(data)]
            offset += len(data)
        store.check_leaf_count(store.start, store.end)
        return True
    for flushes in range(1024):
        for writes in range(random.randint(1,16)):
            start = random.randint(0, SIZE-1)
            size = min(SIZE-start, random.randint(1, 1024))
            #size = min(SIZE-start, random.randint(1, 128))
            #start = len(comparison)
            #size = random.randint(1,128)
            end = start + size
            data = random.getrandbits(size*8).to_bytes(size, 'little')
            store.write(start, data)
            comparison[start:end] = data
            #compare(store, comparison)
            #print('OK', len(store.data), 'x', store.height, '/', store.max_height, 'count =', store.leaf_count, 'flushes =', flushes, 'writes =', writes)#, offset)
        compare(store, comparison)
        print('OK', len(store.data), 'x', store.height, '/', store.max_height, '; count =', store.leaf_count, '; flushes =', flushes, '; full =', store.is_full())#, writes)#, offset)
        store = Flush(prev_flush = store)
        compare(store, comparison)

if __name__ == '__main__':
    main()
    #import cProfile
    #cProfile.run('main()')

Reply via email to