[
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874575#comment-15874575
]
ASF GitHub Bot commented on FLINK-4856:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3336#discussion_r101992577
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
---
@@ -484,6 +487,71 @@ public static Throwable
deserializeServerFailure(ByteBuf buf) throws IOException
return null;
}
}
+
+ /**
+ * Serializes all values of the Iterable with the given serializer.
+ *
+ * @param entries Key-value pairs to serialize
+ * @param keySerializer Serializer for UK
+ * @param valueSerializer Serializer for UV
+ * @param <UK> Type of the keys
+ * @param <UV> Type of the values
+ * @return Serialized values or <code>null</code> if values
<code>null</code> or empty
+ * @throws IOException On failure during serialization
+ */
+ public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>>
entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer)
throws IOException {
+ if (entries != null) {
+ Iterator<Map.Entry<UK, UV>> it = entries.iterator();
+
+ if (it.hasNext()) {
+ // Serialize
+ DataOutputSerializer dos = new
DataOutputSerializer(32);
+
+ while (it.hasNext()) {
+ Map.Entry<UK, UV> entry = it.next();
+
+ keySerializer.serialize(entry.getKey(),
dos);
+
valueSerializer.serialize(entry.getValue(), dos);
+ }
+
+ return dos.getCopyOfBuffer();
+ } else {
+ return null;
--- End diff --
I wonder if null and and empty map should considered the same. From the
other code in the class, I think we should return an empty byte[] here. Then,
you could also simply use a for-each loop over the iterable to make the code
shorter.
> Add MapState for keyed streams
> ------------------------------
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API, State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently,
> these states are implemented by storing the entire map into a ValueState or a
> ListState. The implementation however is very costly because all entries have
> to be serialized/deserialized when updating a single entry. To improve the
> efficiency of these states, MapStates are urgently needed.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)