This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 29afe19116b IGNITE-27479 System View. Introduce late materialization
for row from system views (#7333)
29afe19116b is described below
commit 29afe19116b1965442e997899a2b7b75da9837f7
Author: korlov42 <[email protected]>
AuthorDate: Wed Dec 31 09:46:25 2025 +0200
IGNITE-27479 System View. Introduce late materialization for row from
system views (#7333)
---
.../systemviews/TablesSystemViewProvider.java | 23 +-
.../ignite/internal/util/FlatteningIterator.java | 86 +++++++
.../internal/util/FlatteningIteratorTest.java | 225 ++++++++++++++++++
.../internal/benchmark/SystemViewsBenchmark.java | 156 ++++++++++++
.../internal/systemview/SystemViewManagerImpl.java | 261 +++++++++++++++++++--
5 files changed, 723 insertions(+), 28 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/systemviews/TablesSystemViewProvider.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/systemviews/TablesSystemViewProvider.java
index 228023600dd..25088dc1f98 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/systemviews/TablesSystemViewProvider.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/systemviews/TablesSystemViewProvider.java
@@ -31,7 +31,9 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescript
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.systemview.api.SystemView;
import org.apache.ignite.internal.systemview.api.SystemViews;
+import org.apache.ignite.internal.util.FlatteningIterator;
import org.apache.ignite.internal.util.SubscriptionUtils;
+import org.apache.ignite.internal.util.TransformingIterator;
import org.jetbrains.annotations.Nullable;
/**
@@ -99,16 +101,19 @@ public class TablesSystemViewProvider implements
CatalogSystemViewProvider {
Iterable<ColumnMetadata> viewData = () -> {
Catalog catalog = catalogSupplier.get();
- return catalog.tables().stream()
- .flatMap(table -> table.columns().stream()
- .map(columnDescriptor -> new ColumnMetadata(
-
catalog.schema(table.schemaId()).name(),
- table,
- columnDescriptor
- )
- )
+ return new FlatteningIterator<>(
+ new TransformingIterator<>(
+ catalog.tables().iterator(),
+ table -> {
+ String schemaName =
catalog.schema(table.schemaId()).name();
+
+ return TransformingIterator.newIterable(
+ table.columns(),
+ column -> new
ColumnMetadata(schemaName, table, column)
+ );
+ }
)
- .iterator();
+ );
};
Publisher<ColumnMetadata> viewDataPublisher =
SubscriptionUtils.fromIterable(viewData);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/FlatteningIterator.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/FlatteningIterator.java
new file mode 100644
index 00000000000..3a5cadae765
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/FlatteningIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * An iterator which implements FLAT MAP operation on the input.
+ */
+public class FlatteningIterator<T> implements Iterator<T>, AutoCloseable {
+ private final Iterator<? extends Iterable<T>> source;
+
+ private @Nullable Iterator<T> current;
+
+ /** Constructs the object. */
+ public FlatteningIterator(Iterator<? extends Iterable<T>> source) {
+ this.source = source;
+ }
+
+ @Override
+ public boolean hasNext() {
+ do {
+ if (current != null) {
+ if (current.hasNext()) {
+ return true;
+ }
+
+ // current is completely drained, reset.
+ current = null;
+ }
+
+ if (!source.hasNext()) {
+ return false;
+ }
+
+ current = source.next().iterator();
+ } while (true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ assert current != null;
+
+ return current.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void remove() {
+ if (current == null) {
+ throw new IllegalStateException();
+ }
+
+ current.remove();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws Exception {
+ if (source instanceof AutoCloseable) {
+ ((AutoCloseable) source).close();
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/FlatteningIteratorTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/FlatteningIteratorTest.java
new file mode 100644
index 00000000000..db6f5902da1
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/FlatteningIteratorTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.junit.jupiter.api.Test;
+
+/** Set of tests to verify {@link FlatteningIterator}. */
+@SuppressWarnings("resource")
+class FlatteningIteratorTest {
+ @Test
+ void testBasicFlattening() {
+ List<List<Integer>> input = List.of(
+ List.of(1, 2, 3),
+ List.of(4, 5),
+ List.of(6, 7, 8, 9)
+ );
+
+ FlatteningIterator<Integer> iterator = new
FlatteningIterator<>(input.iterator());
+
+ List<Integer> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+
+ assertEquals(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9), result);
+ }
+
+ @Test
+ void testEmptySource() {
+ List<List<Integer>> input = List.of();
+
+ @SuppressWarnings("RedundantOperationOnEmptyContainer")
+ FlatteningIterator<Integer> iterator = new
FlatteningIterator<>(input.iterator());
+
+ assertFalse(iterator.hasNext());
+ assertThrows(NoSuchElementException.class, iterator::next);
+ }
+
+ @Test
+ void testAllEmptyIterables() {
+ List<List<Integer>> input = List.of(
+ List.of(),
+ List.of(),
+ List.of()
+ );
+
+ FlatteningIterator<Integer> iterator = new
FlatteningIterator<>(input.iterator());
+
+ assertFalse(iterator.hasNext());
+ assertThrows(NoSuchElementException.class, iterator::next);
+ }
+
+ @Test
+ void testMixedEmptyAndNonEmpty() {
+ List<List<Integer>> input = List.of(
+ List.of(),
+ List.of(1, 2),
+ List.of(),
+ List.of(3),
+ List.of()
+ );
+
+ FlatteningIterator<Integer> iterator = new
FlatteningIterator<>(input.iterator());
+
+ List<Integer> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+
+ assertEquals(List.of(1, 2, 3), result);
+ }
+
+ @Test
+ void testSingleElementSingleIterable() {
+ List<List<String>> input = List.of(
+ List.of("hello")
+ );
+
+ FlatteningIterator<String> iterator = new
FlatteningIterator<>(input.iterator());
+
+ assertTrue(iterator.hasNext());
+ assertEquals("hello", iterator.next());
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ void testExhaustedIterator() {
+ List<List<Integer>> input = List.of(List.of(1, 2));
+
+ FlatteningIterator<Integer> iterator = new
FlatteningIterator<>(input.iterator());
+
+ iterator.next(); // 1
+ iterator.next(); // 2
+
+ assertFalse(iterator.hasNext());
+ assertThrows(NoSuchElementException.class, iterator::next);
+ }
+
+ @Test
+ void testMultipleHasNextCalls() {
+ List<List<Integer>> input = List.of(List.of(1, 2));
+
+ FlatteningIterator<Integer> iterator = new
FlatteningIterator<>(input.iterator());
+
+ assertTrue(iterator.hasNext());
+ assertTrue(iterator.hasNext());
+ assertTrue(iterator.hasNext());
+
+ assertEquals(1, iterator.next());
+
+ assertTrue(iterator.hasNext());
+ assertTrue(iterator.hasNext());
+
+ assertEquals(2, iterator.next());
+ }
+
+ @Test
+ void testWithStrings() {
+ List<List<String>> input = List.of(
+ List.of("a", "b"),
+ List.of("c", "d", "e")
+ );
+
+ FlatteningIterator<String> iterator = new
FlatteningIterator<>(input.iterator());
+
+ List<String> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+
+ assertEquals(List.of("a", "b", "c", "d", "e"), result);
+ }
+
+ @Test
+ void testAutoCloseable() {
+ class ClosableIterator<T> implements Iterator<T>, AutoCloseable {
+ private final Iterator<T> delegate;
+ private boolean closeCalled;
+
+ private ClosableIterator(Iterator<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void close() {
+ closeCalled = true;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return delegate.next();
+ }
+ }
+
+ ClosableIterator<List<Integer>> input = new ClosableIterator<>(List.of(
+ List.of(1, 2, 3)
+ ).iterator());
+
+ assertDoesNotThrow(() -> {
+ try (FlatteningIterator<Integer> iterator = new
FlatteningIterator<>(input)) {
+ iterator.next();
+ }
+ });
+ assertTrue(input.closeCalled);
+ }
+
+ @Test
+ void testEmptyIterableAtBeginning() {
+ List<List<Integer>> input = List.of(
+ List.of(),
+ List.of(1, 2, 3)
+ );
+
+ FlatteningIterator<Integer> iterator = new
FlatteningIterator<>(input.iterator());
+
+ assertTrue(iterator.hasNext());
+ assertEquals(1, iterator.next());
+ }
+
+ @Test
+ void testEmptyIterableAtEnd() {
+ List<List<Integer>> input = List.of(
+ List.of(1, 2, 3),
+ List.of()
+ );
+
+ FlatteningIterator<Integer> iterator = new
FlatteningIterator<>(input.iterator());
+
+ iterator.next(); // 1
+ iterator.next(); // 2
+ iterator.next(); // 3
+
+ assertFalse(iterator.hasNext());
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SystemViewsBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SystemViewsBenchmark.java
new file mode 100644
index 00000000000..d2ea9ef91a2
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/SystemViewsBenchmark.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.sql.IgniteSql;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+/**
+ * Benchmark that runs sql queries over system views to measure its
performance.
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+@Threads(1)
+@Warmup(iterations = 10, time = 2)
+@Measurement(iterations = 20, time = 2)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@SuppressWarnings({"WeakerAccess", "unused"})
+public class SystemViewsBenchmark extends AbstractMultiNodeBenchmark {
+ private IgniteSql sql;
+
+ @Param("3")
+ private int clusterSize;
+
+ /** Fills the table with data. */
+ @Setup
+ public void setUp() throws IOException {
+ sql = publicIgnite.sql();
+ }
+
+ /** Benchmark to measure performance of queries over TABLE-COLUMNS system
view. */
+ @Benchmark
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void getPrimaryKeys(FiveHundredTablesState state, Blackhole bh) {
+ try (var rs = sql.execute(null, "SELECT column_name\n"
+ + "FROM SYSTEM.table_columns\n"
+ + "WHERE table_name = ?\n"
+ + "AND pk_column_ordinal IS NOT NULL;", state.tableName())) {
+ while (rs.hasNext()) {
+ bh.consume(rs.next());
+ }
+ }
+ }
+
+ /** Benchmark to measure performance of queries over TABLE-COLUMNS system
view. */
+ @Benchmark
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void getColumnsType(FiveHundredTablesState state, Blackhole bh) {
+ try (var rs = sql.execute(null, "SELECT column_name,column_type\n"
+ + "FROM SYSTEM.table_columns\n"
+ + "WHERE table_name = ?;", state.tableName())) {
+ while (rs.hasNext()) {
+ bh.consume(rs.next());
+ }
+ }
+ }
+
+ /** State that creates 500 tables with 10 columns each and 2 columns
primary key. */
+ @State(Scope.Benchmark)
+ public static class FiveHundredTablesState {
+ private static final int TABLES_COUNT = 500;
+
+ /** Creates necessary tables. */
+ @Setup
+ public void setUp() throws IOException {
+ IgniteSql sql = publicIgnite.sql();
+
+ int columnsCount = 10;
+
+ StringBuilder scriptBuilder = new StringBuilder(
+ "CREATE ZONE my_zone (PARTITIONS 1, REPLICAS 1) STORAGE
PROFILES ['default'];"
+ ).append("ALTER ZONE my_zone SET DEFAULT;");
+
+ for (int t = 0; t < TABLES_COUNT; t++) {
+ scriptBuilder.append("CREATE TABLE
my_table_").append(t).append("(");
+
+ for (int c = 0; c < columnsCount; c++) {
+ if (c > 0) {
+ scriptBuilder.append(", ");
+ }
+
+ scriptBuilder.append("c_").append(c).append(" INT");
+ }
+
+ scriptBuilder.append(", PRIMARY KEY (c_1, c_2));");
+ }
+
+ sql.executeScript(scriptBuilder.toString());
+ }
+
+ @SuppressWarnings("MethodMayBeStatic")
+ String tableName() {
+ return "MY_TABLE_" +
ThreadLocalRandom.current().nextInt(TABLES_COUNT);
+ }
+ }
+
+ /**
+ * Benchmark's entry point.
+ */
+ public static void main(String[] args) throws RunnerException {
+ Options opt = new OptionsBuilder()
+ .include(".*" + SystemViewsBenchmark.class.getSimpleName() +
".*")
+ .build();
+
+ new Runner(opt).run();
+ }
+
+ @Override
+ protected void createDistributionZoneOnStartup() {
+ // NO-OP
+ }
+
+ @Override
+ protected void createTablesOnStartup() {
+ // NO-OP
+ }
+
+ @Override
+ protected int nodes() {
+ return clusterSize;
+ }
+}
diff --git
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
index cbf25438ce2..a873afaba6a 100644
---
a/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
+++
b/modules/system-view/src/main/java/org/apache/ignite/internal/systemview/SystemViewManagerImpl.java
@@ -23,12 +23,22 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Period;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,17 +56,16 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.InternalTuple;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.systemview.api.NodeSystemView;
import org.apache.ignite.internal.systemview.api.SystemView;
-import org.apache.ignite.internal.systemview.api.SystemViewColumn;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.systemview.api.SystemViewProvider;
import org.apache.ignite.internal.systemview.utils.SystemViewUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.subscription.TransformingPublisher;
import org.apache.ignite.lang.ErrorGroups.Common;
+import org.jetbrains.annotations.Nullable;
/**
* SQL system views manager implementation.
@@ -246,8 +255,14 @@ public class SystemViewManagerImpl implements
SystemViewManager, NodeAttributesP
private static Map<String, ScannableView<?>> toScannableViews(String
localNodeName, Map<String, SystemView<?>> views) {
Map<String, ScannableView<?>> scannableViews = new HashMap<>();
+ ViewRowFactory nodeViewRowFactory = new
NodeViewRowFactory(localNodeName);
+
for (SystemView<?> view : views.values()) {
- scannableViews.put(view.name(), new ScannableView<>(localNodeName,
(SystemView<Object>) view));
+ ViewRowFactory rowFactory = view instanceof NodeSystemView
+ ? nodeViewRowFactory
+ : ClusterViewRowFactory.INSTANCE;
+
+ scannableViews.put(view.name(), new ScannableView<>(rowFactory,
(SystemView<Object>) view));
}
return Map.copyOf(scannableViews);
@@ -256,30 +271,238 @@ public class SystemViewManagerImpl implements
SystemViewManager, NodeAttributesP
private static class ScannableView<T> {
private final Publisher<InternalTuple> publisher;
- private ScannableView(String localNodeName, SystemView<T> view) {
+ private ScannableView(ViewRowFactory rowFactory, SystemView<T> view) {
BinaryTupleSchema schema = tupleSchemaForView(view);
+ this.publisher = new TransformingPublisher<>(view.dataProvider(),
object -> rowFactory.create(schema, view, object));
+ }
- this.publisher = new TransformingPublisher<>(view.dataProvider(),
object -> {
- BinaryTupleBuilder builder = new
BinaryTupleBuilder(schema.elementCount());
+ Publisher<InternalTuple> scan() {
+ return publisher;
+ }
+ }
- int offset = 0;
- if (view instanceof NodeSystemView) {
- builder.appendString(localNodeName);
- offset++;
- }
+ private abstract static class ViewRowFactory {
+ abstract <ViewSourceT> InternalTuple create(BinaryTupleSchema schema,
SystemView<ViewSourceT> view, ViewSourceT source);
+ }
- for (int i = 0; i < view.columns().size(); i++) {
- SystemViewColumn<T, ?> column = view.columns().get(i);
+ private static class NodeViewRowFactory extends ViewRowFactory {
+ private final String nodeName;
- schema.appendValue(builder, i + offset,
column.value().apply(object));
- }
+ private NodeViewRowFactory(String nodeName) {
+ this.nodeName = nodeName;
+ }
- return new BinaryTuple(schema.elementCount(), builder.build());
- });
+ @Override
+ <ViewSourceT> InternalTuple create(BinaryTupleSchema schema,
SystemView<ViewSourceT> view, ViewSourceT source) {
+ return new NodeViewRow<>(schema, nodeName, view, source);
}
+ }
- Publisher<InternalTuple> scan() {
- return publisher;
+ private static class ClusterViewRowFactory extends ViewRowFactory {
+ private static final ViewRowFactory INSTANCE = new
ClusterViewRowFactory();
+
+ @Override
+ <ViewSourceT> InternalTuple create(BinaryTupleSchema schema,
SystemView<ViewSourceT> view, ViewSourceT source) {
+ return new ClusterViewRow<>(schema, view, source);
+ }
+ }
+
+ private static class NodeViewRow<T> extends AbstractViewRow {
+ private final String nodeName;
+ private final SystemView<T> view;
+ private final T source;
+
+ private NodeViewRow(BinaryTupleSchema schema, String nodeName,
SystemView<T> view, T source) {
+ super(schema);
+
+ this.nodeName = nodeName;
+ this.view = view;
+ this.source = source;
+ }
+
+ @Override
+ public int elementCount() {
+ return view.columns().size() + 1;
+ }
+
+ @Override
+ <ReturnT> ReturnT value(int columnIndex) {
+ return columnIndex == 0
+ ? (ReturnT) nodeName
+ : (ReturnT) view.columns().get(columnIndex -
1).value().apply(source);
+ }
+ }
+
+ private static class ClusterViewRow<T> extends AbstractViewRow {
+ private final SystemView<T> view;
+ private final T source;
+
+ private ClusterViewRow(BinaryTupleSchema schema, SystemView<T> view, T
source) {
+ super(schema);
+ this.view = view;
+ this.source = source;
+ }
+
+ @Override
+ public int elementCount() {
+ return view.columns().size();
+ }
+
+ @Override
+ <ReturnT> ReturnT value(int columnIndex) {
+ return (ReturnT)
view.columns().get(columnIndex).value().apply(source);
+ }
+ }
+
+ private abstract static class AbstractViewRow implements InternalTuple {
+ private final BinaryTupleSchema schema;
+
+ private AbstractViewRow(BinaryTupleSchema schema) {
+ this.schema = schema;
+ }
+
+ abstract <T> T value(int columnIndex);
+
+ @Override
+ public boolean hasNullValue(int columnIndex) {
+ return value(columnIndex) == null;
+ }
+
+ @Override
+ public boolean booleanValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Boolean booleanValueBoxed(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public byte byteValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Byte byteValueBoxed(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public short shortValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Short shortValueBoxed(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public int intValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Integer intValueBoxed(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public long longValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Long longValueBoxed(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public float floatValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Float floatValueBoxed(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public double doubleValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Double doubleValueBoxed(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public @Nullable BigDecimal decimalValue(int columnIndex, int scale) {
+ BigDecimal value = value(columnIndex);
+
+ if (value == null) {
+ return null;
+ }
+
+ return value.setScale(scale, RoundingMode.UNNECESSARY);
+ }
+
+ @Override
+ public String stringValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public byte[] bytesValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public UUID uuidValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public LocalDate dateValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public LocalTime timeValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public LocalDateTime dateTimeValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Instant timestampValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Period periodValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public Duration durationValue(int columnIndex) {
+ return value(columnIndex);
+ }
+
+ @Override
+ public ByteBuffer byteBuffer() {
+ BinaryTupleBuilder builder = new
BinaryTupleBuilder(schema.elementCount());
+
+ for (int i = 0; i < schema.elementCount(); i++) {
+ schema.appendValue(builder, i, value(i));
+ }
+
+ return builder.build();
}
}
}