This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a5c12c5589b [FLINK-30345][state/changelog] Avoid using sync DataOutputViewStreamWrapper to serialize records to changelog a5c12c5589b is described below commit a5c12c5589b92d5e8b22a4aa952a0c06543424a3 Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Fri Dec 9 09:44:33 2022 +0800 [FLINK-30345][state/changelog] Avoid using sync DataOutputViewStreamWrapper to serialize records to changelog --- .../state/changelog/AbstractStateChangeLogger.java | 42 +++++++++------------- .../flink/state/changelog/ChangelogMapState.java | 8 ++--- .../state/changelog/KvStateChangeLoggerImpl.java | 17 +++++---- .../PriorityQueueStateChangeLoggerImpl.java | 7 ++-- .../flink/state/changelog/StateChangeLogger.java | 8 ++--- .../changelog/StateChangeLoggingIterator.java | 11 +++--- .../restore/ChangelogMigrationRestoreTarget.java | 11 +++--- .../state/changelog/ChangelogPqStateTest.java | 11 +++--- .../flink/state/changelog/TestChangeLoggerKv.java | 8 ++--- 9 files changed, 54 insertions(+), 69 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java index 1f84a912284..fa1af755752 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java @@ -17,7 +17,8 @@ package org.apache.flink.state.changelog; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; @@ -27,11 +28,8 @@ import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters; import org.apache.flink.util.function.ThrowingConsumer; -import org.apache.flink.shaded.guava30.com.google.common.io.Closer; - import javax.annotation.Nullable; -import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; @@ -54,8 +52,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> protected final InternalKeyContext<Key> keyContext; protected RegisteredStateMetaInfoBase metaInfo; private final StateMetaInfoSnapshot.BackendStateType stateType; - private final ByteArrayOutputStream out = new ByteArrayOutputStream(); - private final DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out); + private final DataOutputSerializer out = new DataOutputSerializer(128); private boolean metaDataWritten = false; private final short stateShortId; @@ -95,8 +92,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> } } - protected abstract void serializeValue(Value value, DataOutputViewStreamWrapper out) - throws IOException; + protected abstract void serializeValue(Value value, DataOutputView out) throws IOException; @Override public void valueAdded(Value addedValue, Ns ns) throws IOException { @@ -110,21 +106,21 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> @Override public void valueElementAdded( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, Ns ns) + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Ns ns) throws IOException { log(ADD_ELEMENT, dataSerializer, ns); } @Override public void valueElementAddedOrUpdated( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, Ns ns) + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Ns ns) throws IOException { log(ADD_OR_UPDATE_ELEMENT, dataSerializer, ns); } @Override public void valueElementRemoved( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, Ns ns) + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Ns ns) throws IOException { log(REMOVE_ELEMENT, dataSerializer, ns); } @@ -147,7 +143,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> protected void log( StateChangeOperation op, - @Nullable ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataWriter, + @Nullable ThrowingConsumer<DataOutputView, IOException> dataWriter, Ns ns) throws IOException { logMetaIfNeeded(); @@ -172,12 +168,12 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> } } - protected void writeDefaultValueAndTtl(DataOutputViewStreamWrapper out) throws IOException {} + protected void writeDefaultValueAndTtl(DataOutputView out) throws IOException {} private byte[] serialize( StateChangeOperation op, Ns ns, - @Nullable ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataWriter) + @Nullable ThrowingConsumer<DataOutputView, IOException> dataWriter) throws IOException { return serializeRaw( wrapper -> { @@ -190,24 +186,18 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> }); } - protected abstract void serializeScope(Ns ns, DataOutputViewStreamWrapper out) - throws IOException; + protected abstract void serializeScope(Ns ns, DataOutputView out) throws IOException; - private byte[] serializeRaw( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataWriter) + private byte[] serializeRaw(ThrowingConsumer<DataOutputView, IOException> dataWriter) throws IOException { - dataWriter.accept(wrapper); - wrapper.flush(); - byte[] bytes = out.toByteArray(); - out.reset(); + dataWriter.accept(out); + byte[] bytes = out.getCopyOfBuffer(); + out.clear(); return bytes; } @Override public void close() throws IOException { - try (Closer closer = Closer.create()) { - closer.register(wrapper); - closer.register(out); - } + // do nothing } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java index 219839fbd39..02ce3b73bf2 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java @@ -21,7 +21,7 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.typeutils.base.MapSerializer; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.runtime.state.heap.InternalKeyContext; import org.apache.flink.runtime.state.internal.InternalKvState; @@ -199,15 +199,15 @@ class ChangelogMapState<K, N, UK, UV> } } - private void serializeValue(UV value, DataOutputViewStreamWrapper out) throws IOException { + private void serializeValue(UV value, DataOutputView out) throws IOException { getMapSerializer().getValueSerializer().serialize(value, out); } - private void serializeKey(UK key, DataOutputViewStreamWrapper out) throws IOException { + private void serializeKey(UK key, DataOutputView out) throws IOException { getMapSerializer().getKeySerializer().serialize(key, out); } - private ThrowingConsumer<DataOutputViewStreamWrapper, IOException> getWriter(UK key, UV value) { + private ThrowingConsumer<DataOutputView, IOException> getWriter(UK key, UV value) { return out -> { serializeKey(key, out); serializeValue(value, out); diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java index 7d35843f81d..cf346295969 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java @@ -19,7 +19,8 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; @@ -78,21 +79,25 @@ class KvStateChangeLoggerImpl<Key, Value, Ns> extends AbstractStateChangeLogger< } @Override - protected void serializeValue(Value value, DataOutputViewStreamWrapper out) throws IOException { + protected void serializeValue(Value value, DataOutputView out) throws IOException { valueSerializer.serialize(value, out); } @Override - protected void serializeScope(Ns ns, DataOutputViewStreamWrapper out) throws IOException { + protected void serializeScope(Ns ns, DataOutputView out) throws IOException { keySerializer.serialize(keyContext.getCurrentKey(), out); namespaceSerializer.serialize(ns, out); } - protected void writeDefaultValueAndTtl(DataOutputViewStreamWrapper out) throws IOException { + protected void writeDefaultValueAndTtl(DataOutputView out) throws IOException { out.writeBoolean(ttlConfig.isEnabled()); if (ttlConfig.isEnabled()) { - try (ObjectOutputStream o = new ObjectOutputStream(out)) { - o.writeObject(ttlConfig); + try (ByteArrayOutputStreamWithPos outputStreamWithPos = + new ByteArrayOutputStreamWithPos(); + ObjectOutputStream objectOutputStream = + new ObjectOutputStream(outputStreamWithPos)) { + objectOutputStream.writeObject(ttlConfig); + out.write(outputStreamWithPos.toByteArray()); } } out.writeBoolean(defaultValue != null); diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java index 3297a3024be..0f348b7c1d8 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java @@ -18,7 +18,7 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; @@ -42,13 +42,12 @@ class PriorityQueueStateChangeLoggerImpl<K, T> extends AbstractStateChangeLogger } @Override - protected void serializeValue(T t, DataOutputViewStreamWrapper out) throws IOException { + protected void serializeValue(T t, DataOutputView out) throws IOException { serializer.serialize(t, out); } @Override - protected void serializeScope(Void unused, DataOutputViewStreamWrapper out) - throws IOException {} + protected void serializeScope(Void unused, DataOutputView out) throws IOException {} @Override protected PriorityQueueStateChangeLoggerImpl<K, T> setMetaInfo( diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java index ed12624cb05..86e6f80ac5f 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java @@ -19,7 +19,7 @@ package org.apache.flink.state.changelog; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.function.ThrowingConsumer; import java.io.Closeable; @@ -60,17 +60,17 @@ public interface StateChangeLogger<Value, Namespace> extends Closeable { /** State element added, such as append of a single element to a list. */ void valueElementAdded( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, Namespace ns) + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Namespace ns) throws IOException; /** State element added or updated, such as put into a map. */ void valueElementAddedOrUpdated( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, Namespace ns) + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Namespace ns) throws IOException; /** State element removed, such mapping removal from a map. */ void valueElementRemoved( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, Namespace ns) + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Namespace ns) throws IOException; /** Enable logging meta data before next writes. */ diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java index 3484e4af717..39802665e3e 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java @@ -17,7 +17,7 @@ package org.apache.flink.state.changelog; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.BiConsumerWithException; @@ -32,16 +32,14 @@ class StateChangeLoggingIterator<State, StateElement, Namespace> private final CloseableIterator<StateElement> iterator; private final StateChangeLogger<State, Namespace> changeLogger; - private final BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, IOException> - removalWriter; + private final BiConsumerWithException<StateElement, DataOutputView, IOException> removalWriter; private final Namespace ns; @Nullable private StateElement lastReturned; private StateChangeLoggingIterator( CloseableIterator<StateElement> iterator, StateChangeLogger<State, Namespace> changeLogger, - BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, IOException> - removalWriter, + BiConsumerWithException<StateElement, DataOutputView, IOException> removalWriter, Namespace ns) { this.iterator = iterator; this.changeLogger = changeLogger; @@ -73,8 +71,7 @@ class StateChangeLoggingIterator<State, StateElement, Namespace> public static <Namespace, State, StateElement> CloseableIterator<StateElement> create( CloseableIterator<StateElement> iterator, StateChangeLogger<State, Namespace> changeLogger, - BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, IOException> - removalWriter, + BiConsumerWithException<StateElement, DataOutputView, IOException> removalWriter, Namespace ns) { return new StateChangeLoggingIterator<>(iterator, changeLogger, removalWriter, ns); } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java index 545e78b8ef6..4af8c06802b 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -170,20 +170,17 @@ public class ChangelogMigrationRestoreTarget<K> implements ChangelogRestoreTarge @Override public void valueElementAdded( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, - Namespace ns) + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Namespace ns) throws IOException {} @Override public void valueElementAddedOrUpdated( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, - Namespace ns) + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Namespace ns) throws IOException {} @Override public void valueElementRemoved( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, - Namespace ns) + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Namespace ns) throws IOException {} @Override diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java index 7b697813e13..96203e18915 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java @@ -18,7 +18,7 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.function.FunctionWithException; @@ -157,22 +157,19 @@ public class ChangelogPqStateTest { @Override public void valueElementAdded( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, - Void ns) { + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Void ns) { stateElementAdded = true; } @Override public void valueElementAddedOrUpdated( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, - Void ns) { + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Void ns) { stateElementChanged = true; } @Override public void valueElementRemoved( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, - Void ns) { + ThrowingConsumer<DataOutputView, IOException> dataSerializer, Void ns) { stateElementRemoved = true; } diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java index 43f54a740ab..338dc5cca1f 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java @@ -17,7 +17,7 @@ package org.apache.flink.state.changelog; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.function.ThrowingConsumer; import java.io.IOException; @@ -101,19 +101,19 @@ class TestChangeLoggerKv<State> implements KvStateChangeLogger<State, String> { @Override public void valueElementAdded( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, String ns) { + ThrowingConsumer<DataOutputView, IOException> dataSerializer, String ns) { stateElementAdded = true; } @Override public void valueElementAddedOrUpdated( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, String ns) { + ThrowingConsumer<DataOutputView, IOException> dataSerializer, String ns) { stateElementChanged = true; } @Override public void valueElementRemoved( - ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataSerializer, String ns) { + ThrowingConsumer<DataOutputView, IOException> dataSerializer, String ns) { stateElementRemoved = true; }