This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a5c12c5589b [FLINK-30345][state/changelog] Avoid using sync 
DataOutputViewStreamWrapper to serialize records to changelog
a5c12c5589b is described below

commit a5c12c5589b92d5e8b22a4aa952a0c06543424a3
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Fri Dec 9 09:44:33 2022 +0800

    [FLINK-30345][state/changelog] Avoid using sync DataOutputViewStreamWrapper 
to serialize records to changelog
---
 .../state/changelog/AbstractStateChangeLogger.java | 42 +++++++++-------------
 .../flink/state/changelog/ChangelogMapState.java   |  8 ++---
 .../state/changelog/KvStateChangeLoggerImpl.java   | 17 +++++----
 .../PriorityQueueStateChangeLoggerImpl.java        |  7 ++--
 .../flink/state/changelog/StateChangeLogger.java   |  8 ++---
 .../changelog/StateChangeLoggingIterator.java      | 11 +++---
 .../restore/ChangelogMigrationRestoreTarget.java   | 11 +++---
 .../state/changelog/ChangelogPqStateTest.java      | 11 +++---
 .../flink/state/changelog/TestChangeLoggerKv.java  |  8 ++---
 9 files changed, 54 insertions(+), 69 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index 1f84a912284..fa1af755752 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.state.changelog;
 
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
@@ -27,11 +28,8 @@ import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
 import org.apache.flink.util.function.ThrowingConsumer;
 
-import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
-
 import javax.annotation.Nullable;
 
-import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
 
@@ -54,8 +52,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
     protected final InternalKeyContext<Key> keyContext;
     protected RegisteredStateMetaInfoBase metaInfo;
     private final StateMetaInfoSnapshot.BackendStateType stateType;
-    private final ByteArrayOutputStream out = new ByteArrayOutputStream();
-    private final DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(out);
+    private final DataOutputSerializer out = new DataOutputSerializer(128);
     private boolean metaDataWritten = false;
     private final short stateShortId;
 
@@ -95,8 +92,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
         }
     }
 
-    protected abstract void serializeValue(Value value, 
DataOutputViewStreamWrapper out)
-            throws IOException;
+    protected abstract void serializeValue(Value value, DataOutputView out) 
throws IOException;
 
     @Override
     public void valueAdded(Value addedValue, Ns ns) throws IOException {
@@ -110,21 +106,21 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
 
     @Override
     public void valueElementAdded(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Ns ns)
+            ThrowingConsumer<DataOutputView, IOException> dataSerializer, Ns 
ns)
             throws IOException {
         log(ADD_ELEMENT, dataSerializer, ns);
     }
 
     @Override
     public void valueElementAddedOrUpdated(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Ns ns)
+            ThrowingConsumer<DataOutputView, IOException> dataSerializer, Ns 
ns)
             throws IOException {
         log(ADD_OR_UPDATE_ELEMENT, dataSerializer, ns);
     }
 
     @Override
     public void valueElementRemoved(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Ns ns)
+            ThrowingConsumer<DataOutputView, IOException> dataSerializer, Ns 
ns)
             throws IOException {
         log(REMOVE_ELEMENT, dataSerializer, ns);
     }
@@ -147,7 +143,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
 
     protected void log(
             StateChangeOperation op,
-            @Nullable ThrowingConsumer<DataOutputViewStreamWrapper, 
IOException> dataWriter,
+            @Nullable ThrowingConsumer<DataOutputView, IOException> dataWriter,
             Ns ns)
             throws IOException {
         logMetaIfNeeded();
@@ -172,12 +168,12 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
         }
     }
 
-    protected void writeDefaultValueAndTtl(DataOutputViewStreamWrapper out) 
throws IOException {}
+    protected void writeDefaultValueAndTtl(DataOutputView out) throws 
IOException {}
 
     private byte[] serialize(
             StateChangeOperation op,
             Ns ns,
-            @Nullable ThrowingConsumer<DataOutputViewStreamWrapper, 
IOException> dataWriter)
+            @Nullable ThrowingConsumer<DataOutputView, IOException> dataWriter)
             throws IOException {
         return serializeRaw(
                 wrapper -> {
@@ -190,24 +186,18 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
                 });
     }
 
-    protected abstract void serializeScope(Ns ns, DataOutputViewStreamWrapper 
out)
-            throws IOException;
+    protected abstract void serializeScope(Ns ns, DataOutputView out) throws 
IOException;
 
-    private byte[] serializeRaw(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter)
+    private byte[] serializeRaw(ThrowingConsumer<DataOutputView, IOException> 
dataWriter)
             throws IOException {
-        dataWriter.accept(wrapper);
-        wrapper.flush();
-        byte[] bytes = out.toByteArray();
-        out.reset();
+        dataWriter.accept(out);
+        byte[] bytes = out.getCopyOfBuffer();
+        out.clear();
         return bytes;
     }
 
     @Override
     public void close() throws IOException {
-        try (Closer closer = Closer.create()) {
-            closer.register(wrapper);
-            closer.register(out);
-        }
+        // do nothing
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
index 219839fbd39..02ce3b73bf2 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
@@ -21,7 +21,7 @@ package org.apache.flink.state.changelog;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -199,15 +199,15 @@ class ChangelogMapState<K, N, UK, UV>
         }
     }
 
-    private void serializeValue(UV value, DataOutputViewStreamWrapper out) 
throws IOException {
+    private void serializeValue(UV value, DataOutputView out) throws 
IOException {
         getMapSerializer().getValueSerializer().serialize(value, out);
     }
 
-    private void serializeKey(UK key, DataOutputViewStreamWrapper out) throws 
IOException {
+    private void serializeKey(UK key, DataOutputView out) throws IOException {
         getMapSerializer().getKeySerializer().serialize(key, out);
     }
 
-    private ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
getWriter(UK key, UV value) {
+    private ThrowingConsumer<DataOutputView, IOException> getWriter(UK key, UV 
value) {
         return out -> {
             serializeKey(key, out);
             serializeValue(value, out);
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
index 7d35843f81d..cf346295969 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
@@ -19,7 +19,8 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -78,21 +79,25 @@ class KvStateChangeLoggerImpl<Key, Value, Ns> extends 
AbstractStateChangeLogger<
     }
 
     @Override
-    protected void serializeValue(Value value, DataOutputViewStreamWrapper 
out) throws IOException {
+    protected void serializeValue(Value value, DataOutputView out) throws 
IOException {
         valueSerializer.serialize(value, out);
     }
 
     @Override
-    protected void serializeScope(Ns ns, DataOutputViewStreamWrapper out) 
throws IOException {
+    protected void serializeScope(Ns ns, DataOutputView out) throws 
IOException {
         keySerializer.serialize(keyContext.getCurrentKey(), out);
         namespaceSerializer.serialize(ns, out);
     }
 
-    protected void writeDefaultValueAndTtl(DataOutputViewStreamWrapper out) 
throws IOException {
+    protected void writeDefaultValueAndTtl(DataOutputView out) throws 
IOException {
         out.writeBoolean(ttlConfig.isEnabled());
         if (ttlConfig.isEnabled()) {
-            try (ObjectOutputStream o = new ObjectOutputStream(out)) {
-                o.writeObject(ttlConfig);
+            try (ByteArrayOutputStreamWithPos outputStreamWithPos =
+                            new ByteArrayOutputStreamWithPos();
+                    ObjectOutputStream objectOutputStream =
+                            new ObjectOutputStream(outputStreamWithPos)) {
+                objectOutputStream.writeObject(ttlConfig);
+                out.write(outputStreamWithPos.toByteArray());
             }
         }
         out.writeBoolean(defaultValue != null);
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
index 3297a3024be..0f348b7c1d8 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
@@ -18,7 +18,7 @@
 package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -42,13 +42,12 @@ class PriorityQueueStateChangeLoggerImpl<K, T> extends 
AbstractStateChangeLogger
     }
 
     @Override
-    protected void serializeValue(T t, DataOutputViewStreamWrapper out) throws 
IOException {
+    protected void serializeValue(T t, DataOutputView out) throws IOException {
         serializer.serialize(t, out);
     }
 
     @Override
-    protected void serializeScope(Void unused, DataOutputViewStreamWrapper out)
-            throws IOException {}
+    protected void serializeScope(Void unused, DataOutputView out) throws 
IOException {}
 
     @Override
     protected PriorityQueueStateChangeLoggerImpl<K, T> setMetaInfo(
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java
index ed12624cb05..86e6f80ac5f 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java
@@ -19,7 +19,7 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.io.Closeable;
@@ -60,17 +60,17 @@ public interface StateChangeLogger<Value, Namespace> 
extends Closeable {
 
     /** State element added, such as append of a single element to a list. */
     void valueElementAdded(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Namespace ns)
+            ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
Namespace ns)
             throws IOException;
 
     /** State element added or updated, such as put into a map. */
     void valueElementAddedOrUpdated(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Namespace ns)
+            ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
Namespace ns)
             throws IOException;
 
     /** State element removed, such mapping removal from a map. */
     void valueElementRemoved(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Namespace ns)
+            ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
Namespace ns)
             throws IOException;
 
     /** Enable logging meta data before next writes. */
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java
index 3484e4af717..39802665e3e 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.state.changelog;
 
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.BiConsumerWithException;
@@ -32,16 +32,14 @@ class StateChangeLoggingIterator<State, StateElement, 
Namespace>
 
     private final CloseableIterator<StateElement> iterator;
     private final StateChangeLogger<State, Namespace> changeLogger;
-    private final BiConsumerWithException<StateElement, 
DataOutputViewStreamWrapper, IOException>
-            removalWriter;
+    private final BiConsumerWithException<StateElement, DataOutputView, 
IOException> removalWriter;
     private final Namespace ns;
     @Nullable private StateElement lastReturned;
 
     private StateChangeLoggingIterator(
             CloseableIterator<StateElement> iterator,
             StateChangeLogger<State, Namespace> changeLogger,
-            BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, 
IOException>
-                    removalWriter,
+            BiConsumerWithException<StateElement, DataOutputView, IOException> 
removalWriter,
             Namespace ns) {
         this.iterator = iterator;
         this.changeLogger = changeLogger;
@@ -73,8 +71,7 @@ class StateChangeLoggingIterator<State, StateElement, 
Namespace>
     public static <Namespace, State, StateElement> 
CloseableIterator<StateElement> create(
             CloseableIterator<StateElement> iterator,
             StateChangeLogger<State, Namespace> changeLogger,
-            BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, 
IOException>
-                    removalWriter,
+            BiConsumerWithException<StateElement, DataOutputView, IOException> 
removalWriter,
             Namespace ns) {
         return new StateChangeLoggingIterator<>(iterator, changeLogger, 
removalWriter, ns);
     }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
index 545e78b8ef6..4af8c06802b 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -170,20 +170,17 @@ public class ChangelogMigrationRestoreTarget<K> 
implements ChangelogRestoreTarge
 
         @Override
         public void valueElementAdded(
-                ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer,
-                Namespace ns)
+                ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
Namespace ns)
                 throws IOException {}
 
         @Override
         public void valueElementAddedOrUpdated(
-                ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer,
-                Namespace ns)
+                ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
Namespace ns)
                 throws IOException {}
 
         @Override
         public void valueElementRemoved(
-                ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer,
-                Namespace ns)
+                ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
Namespace ns)
                 throws IOException {}
 
         @Override
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java
index 7b697813e13..96203e18915 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java
@@ -18,7 +18,7 @@
 package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.function.FunctionWithException;
@@ -157,22 +157,19 @@ public class ChangelogPqStateTest {
 
         @Override
         public void valueElementAdded(
-                ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer,
-                Void ns) {
+                ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
Void ns) {
             stateElementAdded = true;
         }
 
         @Override
         public void valueElementAddedOrUpdated(
-                ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer,
-                Void ns) {
+                ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
Void ns) {
             stateElementChanged = true;
         }
 
         @Override
         public void valueElementRemoved(
-                ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer,
-                Void ns) {
+                ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
Void ns) {
             stateElementRemoved = true;
         }
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java
index 43f54a740ab..338dc5cca1f 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.state.changelog;
 
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import java.io.IOException;
@@ -101,19 +101,19 @@ class TestChangeLoggerKv<State> implements 
KvStateChangeLogger<State, String> {
 
     @Override
     public void valueElementAdded(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, String ns) {
+            ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
String ns) {
         stateElementAdded = true;
     }
 
     @Override
     public void valueElementAddedOrUpdated(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, String ns) {
+            ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
String ns) {
         stateElementChanged = true;
     }
 
     @Override
     public void valueElementRemoved(
-            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, String ns) {
+            ThrowingConsumer<DataOutputView, IOException> dataSerializer, 
String ns) {
         stateElementRemoved = true;
     }
 

Reply via email to