This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git

commit 1c3bbc0d8052e1088cf3700acebb7e27a7d77d0f
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Thu Sep 23 17:52:14 2021 +0800

    [FLINK-24645][iteration] Introduce the iteration record and type info
---
 .../apache/flink/iteration/IterationRecord.java    | 150 ++++++++++++
 .../typeinfo/IterationRecordSerializer.java        | 258 +++++++++++++++++++++
 .../typeinfo/IterationRecordTypeInfo.java          | 103 ++++++++
 .../typeinfo/IterationRecordSerializerTest.java    |  89 +++++++
 4 files changed, 600 insertions(+)

diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationRecord.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationRecord.java
new file mode 100644
index 0000000..e093600
--- /dev/null
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationRecord.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration;
+
+import java.util.Objects;
+
+/** The wrapper for the records in iterative stream. */
+public class IterationRecord<T> {
+
+    /** The type of iteration records. */
+    public enum Type {
+        RECORD,
+
+        EPOCH_WATERMARK,
+
+        BARRIER
+    }
+
+    private Type type;
+
+    private int epoch;
+
+    // -------------------------- Fields for normal records -----------------
+
+    private T value;
+
+    // -------------------------- Fields for epoch watermark -----------------
+
+    /**
+     * The sender is used for the receiver to distinguish the source of the 
event. Currently we
+     * could only know the input that received the event, but there no 
additional information about
+     * which channel it is from.
+     */
+    private String sender;
+
+    // -------------------------- Fields for barrier -----------------
+    private long checkpointId;
+
+    public static <T> IterationRecord<T> newRecord(T value, int epoch) {
+        return new IterationRecord<>(Type.RECORD, epoch, value, null, 0);
+    }
+
+    public static <T> IterationRecord<T> newEpochWatermark(int epoch, String 
sender) {
+        return new IterationRecord<>(Type.EPOCH_WATERMARK, epoch, null, 
sender, 0);
+    }
+
+    public static <T> IterationRecord<T> newBarrier(long checkpointId) {
+        return new IterationRecord<>(Type.BARRIER, 0, null, null, 
checkpointId);
+    }
+
+    private IterationRecord(Type type, int epoch, T value, String sender, long 
checkpointId) {
+        this.type = type;
+        this.epoch = epoch;
+        this.value = value;
+        this.sender = sender;
+        this.checkpointId = checkpointId;
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public void setType(Type type) {
+        this.type = type;
+    }
+
+    public int getEpoch() {
+        return epoch;
+    }
+
+    public void setEpoch(int epoch) {
+        this.epoch = epoch;
+    }
+
+    public T getValue() {
+        return value;
+    }
+
+    public void setValue(T value) {
+        this.value = value;
+    }
+
+    public String getSender() {
+        return sender;
+    }
+
+    public void setSender(String sender) {
+        this.sender = sender;
+    }
+
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    public void setCheckpointId(long checkpointId) {
+        this.checkpointId = checkpointId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        IterationRecord<?> that = (IterationRecord<?>) o;
+        return epoch == that.epoch
+                && type == that.type
+                && Objects.equals(value, that.value)
+                && Objects.equals(sender, that.sender)
+                && checkpointId == that.checkpointId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type, epoch, value, sender, checkpointId);
+    }
+
+    @Override
+    public String toString() {
+        return "IterationRecord{"
+                + "type="
+                + type
+                + ", epoch="
+                + epoch
+                + ", value="
+                + value
+                + ", sender='"
+                + sender
+                + "', checkpointId="
+                + checkpointId
+                + "}";
+    }
+}
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java
new file mode 100644
index 0000000..bb57e35
--- /dev/null
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.typeinfo;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.iteration.IterationRecord;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** The type serializer for {@link IterationRecord}. */
+public class IterationRecordSerializer<T> extends 
TypeSerializer<IterationRecord<T>> {
+    private final TypeSerializer<T> innerSerializer;
+
+    public IterationRecordSerializer(TypeSerializer<T> innerSerializer) {
+        this.innerSerializer = innerSerializer;
+    }
+
+    public TypeSerializer<T> getInnerSerializer() {
+        return innerSerializer;
+    }
+
+    @Override
+    public boolean isImmutableType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<IterationRecord<T>> duplicate() {
+        return new IterationRecordSerializer<>(innerSerializer.duplicate());
+    }
+
+    @Override
+    public IterationRecord<T> createInstance() {
+        return null;
+    }
+
+    @Override
+    public IterationRecord<T> copy(IterationRecord<T> from) {
+        switch (from.getType()) {
+            case RECORD:
+                return IterationRecord.newRecord(
+                        innerSerializer.copy(from.getValue()), 
from.getEpoch());
+            case EPOCH_WATERMARK:
+                return IterationRecord.newEpochWatermark(from.getEpoch(), 
from.getSender());
+            case BARRIER:
+                return IterationRecord.newBarrier(from.getCheckpointId());
+            default:
+                throw new RuntimeException("Unsupported mini-batch record type 
" + from.getType());
+        }
+    }
+
+    @Override
+    public IterationRecord<T> copy(IterationRecord<T> from, IterationRecord<T> 
reuse) {
+        from.setType(reuse.getType());
+        reuse.setEpoch(from.getEpoch());
+
+        switch (from.getType()) {
+            case RECORD:
+                if (reuse.getValue() != null) {
+                    innerSerializer.copy(from.getValue(), reuse.getValue());
+                } else {
+                    reuse.setValue(innerSerializer.copy(from.getValue()));
+                }
+                break;
+            case EPOCH_WATERMARK:
+                reuse.setSender(from.getSender());
+                break;
+            case BARRIER:
+                reuse.setCheckpointId(from.getCheckpointId());
+                break;
+            default:
+                throw new RuntimeException("Unsupported mini-batch record type 
" + from.getType());
+        }
+
+        return reuse;
+    }
+
+    @Override
+    public int getLength() {
+        return -1;
+    }
+
+    @Override
+    public void serialize(IterationRecord<T> record, DataOutputView target) 
throws IOException {
+        target.writeByte((byte) record.getType().ordinal());
+        serializerNumber(record.getEpoch(), target);
+        switch (record.getType()) {
+            case RECORD:
+                innerSerializer.serialize(record.getValue(), target);
+                break;
+            case EPOCH_WATERMARK:
+                StringSerializer.INSTANCE.serialize(record.getSender(), 
target);
+                break;
+            case BARRIER:
+                LongSerializer.INSTANCE.serialize(record.getCheckpointId(), 
target);
+                break;
+            default:
+                throw new IOException("Unsupported mini-batch record type " + 
record.getType());
+        }
+    }
+
+    @Override
+    public IterationRecord<T> deserialize(DataInputView source) throws 
IOException {
+        int type = source.readByte();
+        int epoch = deserializeNumber(source);
+
+        switch (IterationRecord.Type.values()[type]) {
+            case RECORD:
+                T value = innerSerializer.deserialize(source);
+                return IterationRecord.newRecord(value, epoch);
+            case EPOCH_WATERMARK:
+                String sender = StringSerializer.INSTANCE.deserialize(source);
+                return IterationRecord.newEpochWatermark(epoch, sender);
+            case BARRIER:
+                long checkpointId = 
LongSerializer.INSTANCE.deserialize(source);
+                return IterationRecord.newBarrier(checkpointId);
+            default:
+                throw new IOException("Unsupported mini-batch record type " + 
type);
+        }
+    }
+
+    @Override
+    public IterationRecord<T> deserialize(IterationRecord<T> reuse, 
DataInputView source)
+            throws IOException {
+        int type = source.readByte();
+        int epoch = deserializeNumber(source);
+
+        reuse.setType(IterationRecord.Type.values()[type]);
+        reuse.setEpoch(epoch);
+
+        switch (reuse.getType()) {
+            case RECORD:
+                if (reuse.getValue() != null) {
+                    innerSerializer.deserialize(reuse.getValue(), source);
+                } else {
+                    reuse.setValue(innerSerializer.deserialize(source));
+                }
+                return reuse;
+            case EPOCH_WATERMARK:
+                reuse.setSender(StringSerializer.INSTANCE.deserialize(source));
+                return reuse;
+            case BARRIER:
+                
reuse.setCheckpointId(LongSerializer.INSTANCE.deserialize(source));
+                return reuse;
+            default:
+                throw new IOException("Unsupported mini-batch record type " + 
type);
+        }
+    }
+
+    /** Variant encoding for the epoch. */
+    public void serializerNumber(int value, DataOutputView target) throws 
IOException {
+        if (value <= 0x7F) {
+            target.writeByte((byte) (value));
+        } else {
+            while (value > 0x7F) {
+                target.writeByte((byte) ((value & 0x7F) | 0x80));
+                value >>>= 7;
+            }
+            target.writeByte((byte) (value & 0x7F));
+        }
+    }
+
+    public int deserializeNumber(DataInputView source) throws IOException {
+        int offset = 0;
+        int value = 0;
+
+        byte next;
+        while ((next = source.readByte()) < 0) {
+            value |= (((long) (next & 0x7f)) << offset);
+            offset += 7;
+        }
+        value |= (((long) next) << offset);
+
+        return value;
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        IterationRecord<T> record = deserialize(source);
+        serialize(record, target);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        IterationRecordSerializer<?> that = (IterationRecordSerializer<?>) o;
+        return Objects.equals(innerSerializer, that.innerSerializer);
+    }
+
+    @Override
+    public int hashCode() {
+        return innerSerializer != null ? innerSerializer.hashCode() : 0;
+    }
+
+    @Override
+    public TypeSerializerSnapshot<IterationRecord<T>> snapshotConfiguration() {
+        return new IterationRecordTypeSerializerSnapshot<>();
+    }
+
+    private static final class IterationRecordTypeSerializerSnapshot<T>
+            extends CompositeTypeSerializerSnapshot<
+                    IterationRecord<T>, IterationRecordSerializer<T>> {
+
+        private static final int CURRENT_VERSION = 1;
+
+        public IterationRecordTypeSerializerSnapshot() {
+            super(IterationRecordSerializer.class);
+        }
+
+        @Override
+        protected int getCurrentOuterSnapshotVersion() {
+            return CURRENT_VERSION;
+        }
+
+        @Override
+        protected TypeSerializer<?>[] getNestedSerializers(
+                IterationRecordSerializer<T> iterationRecordSerializer) {
+            return new TypeSerializer[] 
{iterationRecordSerializer.getInnerSerializer()};
+        }
+
+        @Override
+        protected IterationRecordSerializer<T> 
createOuterSerializerWithNestedSerializers(
+                TypeSerializer<?>[] typeSerializers) {
+            TypeSerializer<T> elementSerializer = (TypeSerializer<T>) 
typeSerializers[0];
+            return new IterationRecordSerializer<>(elementSerializer);
+        }
+    }
+}
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordTypeInfo.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordTypeInfo.java
new file mode 100644
index 0000000..f89c12d
--- /dev/null
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordTypeInfo.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.typeinfo;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.iteration.IterationRecord;
+
+import java.util.Objects;
+
+/** The type information for {@link IterationRecord}. */
+public class IterationRecordTypeInfo<T> extends 
TypeInformation<IterationRecord<T>> {
+
+    private final TypeInformation<T> innerTypeInfo;
+
+    public IterationRecordTypeInfo(TypeInformation<T> innerTypeInfo) {
+        this.innerTypeInfo = innerTypeInfo;
+    }
+
+    public TypeInformation<T> getInnerTypeInfo() {
+        return innerTypeInfo;
+    }
+
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 1;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    public Class<IterationRecord<T>> getTypeClass() {
+        return (Class) IterationRecord.class;
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<IterationRecord<T>> createSerializer(ExecutionConfig 
config) {
+        return new 
IterationRecordSerializer<>(innerTypeInfo.createSerializer(config));
+    }
+
+    @Override
+    public String toString() {
+        return "IterationRecord Type";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        IterationRecordTypeInfo<T> that = (IterationRecordTypeInfo<T>) o;
+        return Objects.equals(innerTypeInfo, that.innerTypeInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return innerTypeInfo != null ? innerTypeInfo.hashCode() : 0;
+    }
+
+    @Override
+    public boolean canEqual(Object obj) {
+        return obj instanceof IterationRecordTypeInfo;
+    }
+}
diff --git 
a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializerTest.java
 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializerTest.java
new file mode 100644
index 0000000..26bdaab
--- /dev/null
+++ 
b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.typeinfo;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.iteration.IterationRecord;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the serialization and deserialization for the {@link 
IterationRecord}. */
+public class IterationRecordSerializerTest {
+
+    @Test
+    public void testRecordType() throws IOException {
+        testSerializeAndDeserialize(IterationRecord.newRecord(null, 3), 
StringSerializer.INSTANCE);
+        testSerializeAndDeserialize(IterationRecord.newRecord(5, 3), 
IntSerializer.INSTANCE);
+        testSerializeAndDeserialize(
+                IterationRecord.newRecord("Best", 3), 
StringSerializer.INSTANCE);
+    }
+
+    @Test
+    public void testEpochWatermarkType() throws IOException {
+        testSerializeAndDeserialize(
+                IterationRecord.newEpochWatermark(10, "sender1"), 
VoidSerializer.INSTANCE);
+        testSerializeAndDeserialize(
+                IterationRecord.newEpochWatermark(Integer.MAX_VALUE, 
"sender1"),
+                VoidSerializer.INSTANCE);
+    }
+
+    @Test
+    public void testBarrierType() throws IOException {
+        testSerializeAndDeserialize(IterationRecord.newBarrier(15), 
VoidSerializer.INSTANCE);
+    }
+
+    private static <T> void testSerializeAndDeserialize(
+            IterationRecord<T> iterationRecord, TypeSerializer<T> 
internalSerializer)
+            throws IOException {
+        IterationRecordSerializer<T> iterationRecordSerializer =
+                new IterationRecordSerializer<T>(internalSerializer);
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputView dataOutputView = new DataOutputViewStreamWrapper(bos);
+        iterationRecordSerializer.serialize(iterationRecord, dataOutputView);
+
+        byte[] serializedData = bos.toByteArray();
+        bos.close();
+
+        // Test the case of no object-reuse
+        ByteArrayInputStream bis = new ByteArrayInputStream(serializedData);
+        DataInputView dataInputView = new DataInputViewStreamWrapper(bis);
+        IterationRecord<T> deserialized = 
iterationRecordSerializer.deserialize(dataInputView);
+        assertEquals(iterationRecord, deserialized);
+
+        IterationRecord<T> reuse = IterationRecord.newRecord(null, 0);
+        bis = new ByteArrayInputStream(serializedData);
+        dataInputView = new DataInputViewStreamWrapper(bis);
+        reuse = iterationRecordSerializer.deserialize(reuse, dataInputView);
+        assertEquals(iterationRecord, reuse);
+    }
+}

Reply via email to