This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git
commit 1c3bbc0d8052e1088cf3700acebb7e27a7d77d0f Author: Yun Gao <gaoyunhen...@gmail.com> AuthorDate: Thu Sep 23 17:52:14 2021 +0800 [FLINK-24645][iteration] Introduce the iteration record and type info --- .../apache/flink/iteration/IterationRecord.java | 150 ++++++++++++ .../typeinfo/IterationRecordSerializer.java | 258 +++++++++++++++++++++ .../typeinfo/IterationRecordTypeInfo.java | 103 ++++++++ .../typeinfo/IterationRecordSerializerTest.java | 89 +++++++ 4 files changed, 600 insertions(+) diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationRecord.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationRecord.java new file mode 100644 index 0000000..e093600 --- /dev/null +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationRecord.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration; + +import java.util.Objects; + +/** The wrapper for the records in iterative stream. */ +public class IterationRecord<T> { + + /** The type of iteration records. */ + public enum Type { + RECORD, + + EPOCH_WATERMARK, + + BARRIER + } + + private Type type; + + private int epoch; + + // -------------------------- Fields for normal records ----------------- + + private T value; + + // -------------------------- Fields for epoch watermark ----------------- + + /** + * The sender is used for the receiver to distinguish the source of the event. Currently we + * could only know the input that received the event, but there no additional information about + * which channel it is from. + */ + private String sender; + + // -------------------------- Fields for barrier ----------------- + private long checkpointId; + + public static <T> IterationRecord<T> newRecord(T value, int epoch) { + return new IterationRecord<>(Type.RECORD, epoch, value, null, 0); + } + + public static <T> IterationRecord<T> newEpochWatermark(int epoch, String sender) { + return new IterationRecord<>(Type.EPOCH_WATERMARK, epoch, null, sender, 0); + } + + public static <T> IterationRecord<T> newBarrier(long checkpointId) { + return new IterationRecord<>(Type.BARRIER, 0, null, null, checkpointId); + } + + private IterationRecord(Type type, int epoch, T value, String sender, long checkpointId) { + this.type = type; + this.epoch = epoch; + this.value = value; + this.sender = sender; + this.checkpointId = checkpointId; + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public int getEpoch() { + return epoch; + } + + public void setEpoch(int epoch) { + this.epoch = epoch; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } + + public String getSender() { + return sender; + } + + public void setSender(String sender) { + this.sender = sender; + } + + public long getCheckpointId() { + return checkpointId; + } + + public void setCheckpointId(long checkpointId) { + this.checkpointId = checkpointId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IterationRecord<?> that = (IterationRecord<?>) o; + return epoch == that.epoch + && type == that.type + && Objects.equals(value, that.value) + && Objects.equals(sender, that.sender) + && checkpointId == that.checkpointId; + } + + @Override + public int hashCode() { + return Objects.hash(type, epoch, value, sender, checkpointId); + } + + @Override + public String toString() { + return "IterationRecord{" + + "type=" + + type + + ", epoch=" + + epoch + + ", value=" + + value + + ", sender='" + + sender + + "', checkpointId=" + + checkpointId + + "}"; + } +} diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java new file mode 100644 index 0000000..bb57e35 --- /dev/null +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.typeinfo; + +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.iteration.IterationRecord; + +import java.io.IOException; +import java.util.Objects; + +/** The type serializer for {@link IterationRecord}. */ +public class IterationRecordSerializer<T> extends TypeSerializer<IterationRecord<T>> { + private final TypeSerializer<T> innerSerializer; + + public IterationRecordSerializer(TypeSerializer<T> innerSerializer) { + this.innerSerializer = innerSerializer; + } + + public TypeSerializer<T> getInnerSerializer() { + return innerSerializer; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<IterationRecord<T>> duplicate() { + return new IterationRecordSerializer<>(innerSerializer.duplicate()); + } + + @Override + public IterationRecord<T> createInstance() { + return null; + } + + @Override + public IterationRecord<T> copy(IterationRecord<T> from) { + switch (from.getType()) { + case RECORD: + return IterationRecord.newRecord( + innerSerializer.copy(from.getValue()), from.getEpoch()); + case EPOCH_WATERMARK: + return IterationRecord.newEpochWatermark(from.getEpoch(), from.getSender()); + case BARRIER: + return IterationRecord.newBarrier(from.getCheckpointId()); + default: + throw new RuntimeException("Unsupported mini-batch record type " + from.getType()); + } + } + + @Override + public IterationRecord<T> copy(IterationRecord<T> from, IterationRecord<T> reuse) { + from.setType(reuse.getType()); + reuse.setEpoch(from.getEpoch()); + + switch (from.getType()) { + case RECORD: + if (reuse.getValue() != null) { + innerSerializer.copy(from.getValue(), reuse.getValue()); + } else { + reuse.setValue(innerSerializer.copy(from.getValue())); + } + break; + case EPOCH_WATERMARK: + reuse.setSender(from.getSender()); + break; + case BARRIER: + reuse.setCheckpointId(from.getCheckpointId()); + break; + default: + throw new RuntimeException("Unsupported mini-batch record type " + from.getType()); + } + + return reuse; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(IterationRecord<T> record, DataOutputView target) throws IOException { + target.writeByte((byte) record.getType().ordinal()); + serializerNumber(record.getEpoch(), target); + switch (record.getType()) { + case RECORD: + innerSerializer.serialize(record.getValue(), target); + break; + case EPOCH_WATERMARK: + StringSerializer.INSTANCE.serialize(record.getSender(), target); + break; + case BARRIER: + LongSerializer.INSTANCE.serialize(record.getCheckpointId(), target); + break; + default: + throw new IOException("Unsupported mini-batch record type " + record.getType()); + } + } + + @Override + public IterationRecord<T> deserialize(DataInputView source) throws IOException { + int type = source.readByte(); + int epoch = deserializeNumber(source); + + switch (IterationRecord.Type.values()[type]) { + case RECORD: + T value = innerSerializer.deserialize(source); + return IterationRecord.newRecord(value, epoch); + case EPOCH_WATERMARK: + String sender = StringSerializer.INSTANCE.deserialize(source); + return IterationRecord.newEpochWatermark(epoch, sender); + case BARRIER: + long checkpointId = LongSerializer.INSTANCE.deserialize(source); + return IterationRecord.newBarrier(checkpointId); + default: + throw new IOException("Unsupported mini-batch record type " + type); + } + } + + @Override + public IterationRecord<T> deserialize(IterationRecord<T> reuse, DataInputView source) + throws IOException { + int type = source.readByte(); + int epoch = deserializeNumber(source); + + reuse.setType(IterationRecord.Type.values()[type]); + reuse.setEpoch(epoch); + + switch (reuse.getType()) { + case RECORD: + if (reuse.getValue() != null) { + innerSerializer.deserialize(reuse.getValue(), source); + } else { + reuse.setValue(innerSerializer.deserialize(source)); + } + return reuse; + case EPOCH_WATERMARK: + reuse.setSender(StringSerializer.INSTANCE.deserialize(source)); + return reuse; + case BARRIER: + reuse.setCheckpointId(LongSerializer.INSTANCE.deserialize(source)); + return reuse; + default: + throw new IOException("Unsupported mini-batch record type " + type); + } + } + + /** Variant encoding for the epoch. */ + public void serializerNumber(int value, DataOutputView target) throws IOException { + if (value <= 0x7F) { + target.writeByte((byte) (value)); + } else { + while (value > 0x7F) { + target.writeByte((byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + } + target.writeByte((byte) (value & 0x7F)); + } + } + + public int deserializeNumber(DataInputView source) throws IOException { + int offset = 0; + int value = 0; + + byte next; + while ((next = source.readByte()) < 0) { + value |= (((long) (next & 0x7f)) << offset); + offset += 7; + } + value |= (((long) next) << offset); + + return value; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + IterationRecord<T> record = deserialize(source); + serialize(record, target); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + IterationRecordSerializer<?> that = (IterationRecordSerializer<?>) o; + return Objects.equals(innerSerializer, that.innerSerializer); + } + + @Override + public int hashCode() { + return innerSerializer != null ? innerSerializer.hashCode() : 0; + } + + @Override + public TypeSerializerSnapshot<IterationRecord<T>> snapshotConfiguration() { + return new IterationRecordTypeSerializerSnapshot<>(); + } + + private static final class IterationRecordTypeSerializerSnapshot<T> + extends CompositeTypeSerializerSnapshot< + IterationRecord<T>, IterationRecordSerializer<T>> { + + private static final int CURRENT_VERSION = 1; + + public IterationRecordTypeSerializerSnapshot() { + super(IterationRecordSerializer.class); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return CURRENT_VERSION; + } + + @Override + protected TypeSerializer<?>[] getNestedSerializers( + IterationRecordSerializer<T> iterationRecordSerializer) { + return new TypeSerializer[] {iterationRecordSerializer.getInnerSerializer()}; + } + + @Override + protected IterationRecordSerializer<T> createOuterSerializerWithNestedSerializers( + TypeSerializer<?>[] typeSerializers) { + TypeSerializer<T> elementSerializer = (TypeSerializer<T>) typeSerializers[0]; + return new IterationRecordSerializer<>(elementSerializer); + } + } +} diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordTypeInfo.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordTypeInfo.java new file mode 100644 index 0000000..f89c12d --- /dev/null +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordTypeInfo.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.typeinfo; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.iteration.IterationRecord; + +import java.util.Objects; + +/** The type information for {@link IterationRecord}. */ +public class IterationRecordTypeInfo<T> extends TypeInformation<IterationRecord<T>> { + + private final TypeInformation<T> innerTypeInfo; + + public IterationRecordTypeInfo(TypeInformation<T> innerTypeInfo) { + this.innerTypeInfo = innerTypeInfo; + } + + public TypeInformation<T> getInnerTypeInfo() { + return innerTypeInfo; + } + + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class<IterationRecord<T>> getTypeClass() { + return (Class) IterationRecord.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer<IterationRecord<T>> createSerializer(ExecutionConfig config) { + return new IterationRecordSerializer<>(innerTypeInfo.createSerializer(config)); + } + + @Override + public String toString() { + return "IterationRecord Type"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + IterationRecordTypeInfo<T> that = (IterationRecordTypeInfo<T>) o; + return Objects.equals(innerTypeInfo, that.innerTypeInfo); + } + + @Override + public int hashCode() { + return innerTypeInfo != null ? innerTypeInfo.hashCode() : 0; + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof IterationRecordTypeInfo; + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializerTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializerTest.java new file mode 100644 index 0000000..26bdaab --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializerTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.typeinfo; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.VoidSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.iteration.IterationRecord; + +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** Tests the serialization and deserialization for the {@link IterationRecord}. */ +public class IterationRecordSerializerTest { + + @Test + public void testRecordType() throws IOException { + testSerializeAndDeserialize(IterationRecord.newRecord(null, 3), StringSerializer.INSTANCE); + testSerializeAndDeserialize(IterationRecord.newRecord(5, 3), IntSerializer.INSTANCE); + testSerializeAndDeserialize( + IterationRecord.newRecord("Best", 3), StringSerializer.INSTANCE); + } + + @Test + public void testEpochWatermarkType() throws IOException { + testSerializeAndDeserialize( + IterationRecord.newEpochWatermark(10, "sender1"), VoidSerializer.INSTANCE); + testSerializeAndDeserialize( + IterationRecord.newEpochWatermark(Integer.MAX_VALUE, "sender1"), + VoidSerializer.INSTANCE); + } + + @Test + public void testBarrierType() throws IOException { + testSerializeAndDeserialize(IterationRecord.newBarrier(15), VoidSerializer.INSTANCE); + } + + private static <T> void testSerializeAndDeserialize( + IterationRecord<T> iterationRecord, TypeSerializer<T> internalSerializer) + throws IOException { + IterationRecordSerializer<T> iterationRecordSerializer = + new IterationRecordSerializer<T>(internalSerializer); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputView dataOutputView = new DataOutputViewStreamWrapper(bos); + iterationRecordSerializer.serialize(iterationRecord, dataOutputView); + + byte[] serializedData = bos.toByteArray(); + bos.close(); + + // Test the case of no object-reuse + ByteArrayInputStream bis = new ByteArrayInputStream(serializedData); + DataInputView dataInputView = new DataInputViewStreamWrapper(bis); + IterationRecord<T> deserialized = iterationRecordSerializer.deserialize(dataInputView); + assertEquals(iterationRecord, deserialized); + + IterationRecord<T> reuse = IterationRecord.newRecord(null, 0); + bis = new ByteArrayInputStream(serializedData); + dataInputView = new DataInputViewStreamWrapper(bis); + reuse = iterationRecordSerializer.deserialize(reuse, dataInputView); + assertEquals(iterationRecord, reuse); + } +}