[FLINK-2723] [core] CopyableValue method to copy into new instance

This closes #1169


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e727355e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e727355e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e727355e

Branch: refs/heads/master
Commit: e727355e42bd0ad7d403aee703aaf33a68a839d2
Parents: 40cbf7e
Author: Greg Hogan <[email protected]>
Authored: Mon Sep 21 15:14:09 2015 -0400
Committer: Stephan Ewen <[email protected]>
Committed: Tue Sep 29 12:18:07 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/types/BooleanValue.java    |   6 +-
 .../java/org/apache/flink/types/ByteValue.java  |   7 +-
 .../java/org/apache/flink/types/CharValue.java  |   7 +-
 .../org/apache/flink/types/CopyableValue.java   |  30 ++++-
 .../org/apache/flink/types/DoubleValue.java     |   7 +-
 .../java/org/apache/flink/types/FloatValue.java |   7 +-
 .../java/org/apache/flink/types/IntValue.java   |   9 +-
 .../java/org/apache/flink/types/LongValue.java  |   7 +-
 .../java/org/apache/flink/types/NullValue.java  |   7 +-
 .../java/org/apache/flink/types/Record.java     |   5 +
 .../java/org/apache/flink/types/ShortValue.java |   7 +-
 .../org/apache/flink/types/StringValue.java     |   7 +-
 .../apache/flink/types/CopyableValueTest.java   | 109 +++++++++++++++++++
 13 files changed, 203 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java 
b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
index 071e650..e034648 100644
--- a/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/BooleanValue.java
@@ -48,7 +48,6 @@ public class BooleanValue implements 
NormalizableKey<BooleanValue>, ResettableVa
                this.value = value;
        }
 
-       
        public boolean get() {
                return value;
        }
@@ -118,6 +117,11 @@ public class BooleanValue implements 
NormalizableKey<BooleanValue>, ResettableVa
        }
 
        @Override
+       public BooleanValue copy() {
+               return new BooleanValue(this.value);
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                target.write(source, 1);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java 
b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
index c2f1f10..40ed1ad 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ByteValue.java
@@ -147,13 +147,18 @@ public class ByteValue implements 
NormalizableKey<ByteValue>, ResettableValue<By
        public int getBinaryLength() {
                return 1;
        }
-       
+
        @Override
        public void copyTo(ByteValue target) {
                target.value = this.value;
        }
 
        @Override
+       public ByteValue copy() {
+               return new ByteValue(this.value);
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                target.write(source, 1);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/CharValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/CharValue.java 
b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
index 3fd9f29..06b67c7 100644
--- a/flink-core/src/main/java/org/apache/flink/types/CharValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/CharValue.java
@@ -151,13 +151,18 @@ public class CharValue implements 
NormalizableKey<CharValue>, ResettableValue<Ch
        public int getBinaryLength() {
                return 2;
        }
-       
+
        @Override
        public void copyTo(CharValue target) {
                target.value = this.value;
        }
 
        @Override
+       public CharValue copy() {
+               return new CharValue(this.value);
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                target.write(source, 2);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java 
b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
index 57e0c46..3974cb2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/CopyableValue.java
@@ -35,7 +35,35 @@ public interface CopyableValue<T> extends Value {
         */
        int getBinaryLength();
        
+       /**
+        * Performs a deep copy of this object into the {@code target} instance.
+        *
+        * @param target Object to copy into.
+        */
        void copyTo(T target);
-       
+
+       /**
+        * Performs a deep copy of this object into a new instance.
+         *
+         * This method is useful for generic user-defined functions to clone a
+         * {@link CopyableValue} when storing multiple objects. With object 
reuse
+         * a deep copy must be created and type erasure prevents calling new.
+        *
+        * @return New object with copied fields.
+        */
+       T copy();
+
+       /**
+        * Copies the next serialized instance from {@code source} to {@code 
target}.
+        *
+        * This method is equivalent to calling {@code 
IOReadableWritable.read(DataInputView)}
+        * followed by {@code IOReadableWritable.write(DataOutputView)} but 
does not require
+        * intermediate deserialization.
+        *
+        * @param source Data source for serialized instance.
+        * @param target Data target for serialized instance.
+        *
+        * @see org.apache.flink.core.io.IOReadableWritable
+        */
        void copy(DataInputView source, DataOutputView target) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java 
b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
index abcbcf5..3158e40 100644
--- a/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/DoubleValue.java
@@ -122,13 +122,18 @@ public class DoubleValue implements Key<DoubleValue>, 
ResettableValue<DoubleValu
        public int getBinaryLength() {
                return 8;
        }
-       
+
        @Override
        public void copyTo(DoubleValue target) {
                target.value = this.value;
        }
 
        @Override
+       public DoubleValue copy() {
+               return new DoubleValue(this.value);
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                target.write(source, 8);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java 
b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
index d57c74b..5364203 100644
--- a/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/FloatValue.java
@@ -121,13 +121,18 @@ public class FloatValue implements Key<FloatValue>, 
ResettableValue<FloatValue>,
        public int getBinaryLength() {
                return 4;
        }
-       
+
        @Override
        public void copyTo(FloatValue target) {
                target.value = this.value;
        }
 
        @Override
+       public FloatValue copy() {
+               return new FloatValue(this.value);
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                target.write(source, 4);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/IntValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/IntValue.java 
b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
index 423c8c1..1b893f0 100644
--- a/flink-core/src/main/java/org/apache/flink/types/IntValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/IntValue.java
@@ -139,7 +139,7 @@ public class IntValue implements NormalizableKey<IntValue>, 
ResettableValue<IntV
                        }
                }
                else {
-                       target.putIntBigEndian(offset, value  - 
Integer.MIN_VALUE);
+                       target.putIntBigEndian(offset, value - 
Integer.MIN_VALUE);
                        for (int i = 4; i < len; i++) {
                                target.put(offset + i, (byte) 0);
                        }
@@ -152,13 +152,18 @@ public class IntValue implements 
NormalizableKey<IntValue>, ResettableValue<IntV
        public int getBinaryLength() {
                return 4;
        }
-       
+
        @Override
        public void copyTo(IntValue target) {
                target.value = this.value;
        }
 
        @Override
+       public IntValue copy() {
+               return new IntValue(this.value);
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                target.write(source, 4);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/LongValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/LongValue.java 
b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
index e8fcd53..2b6cb1f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/LongValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/LongValue.java
@@ -160,13 +160,18 @@ public class LongValue implements 
NormalizableKey<LongValue>, ResettableValue<Lo
        public int getBinaryLength() {
                return 8;
        }
-       
+
        @Override
        public void copyTo(LongValue target) {
                target.value = this.value;
        }
 
        @Override
+       public LongValue copy() {
+               return new LongValue(this.value);
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                target.write(source, 8);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/NullValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/NullValue.java 
b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
index 5391b7b..aa56536 100644
--- a/flink-core/src/main/java/org/apache/flink/types/NullValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/NullValue.java
@@ -110,12 +110,17 @@ public final class NullValue implements 
NormalizableKey<NullValue>, CopyableValu
        public int getBinaryLength() {
                return 1;
        }
-       
+
        @Override
        public void copyTo(NullValue target) {
        }
 
        @Override
+       public NullValue copy() {
+               return NullValue.getInstance();
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                source.readBoolean();
                target.writeBoolean(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/Record.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java 
b/flink-core/src/main/java/org/apache/flink/types/Record.java
index 218d6ce..24ff979 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Record.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Record.java
@@ -763,6 +763,11 @@ public final class Record implements Value, 
CopyableValue<Record> {
        }
 
        @Override
+       public Record copy() {
+               return createCopy();
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                int val = source.readUnsignedByte();
                target.writeByte(val);

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java 
b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
index ec03497..f18ce7f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/ShortValue.java
@@ -154,13 +154,18 @@ public class ShortValue implements 
NormalizableKey<ShortValue>, ResettableValue<
        public int getBinaryLength() {
                return 2;
        }
-       
+
        @Override
        public void copyTo(ShortValue target) {
                target.value = this.value;
        }
 
        @Override
+       public ShortValue copy() {
+               return new ShortValue(this.value);
+       }
+
+       @Override
        public void copy(DataInputView source, DataOutputView target) throws 
IOException {
                target.write(source, 2);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/main/java/org/apache/flink/types/StringValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/StringValue.java 
b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
index db0f184..2249019 100644
--- a/flink-core/src/main/java/org/apache/flink/types/StringValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/StringValue.java
@@ -682,7 +682,12 @@ public class StringValue implements 
NormalizableKey<StringValue>, CharSequence,
                target.ensureSize(this.len);
                System.arraycopy(this.value, 0, target.value, 0, this.len);
        }
-       
+
+       @Override
+       public StringValue copy() {
+               return new StringValue(this);
+       }
+
        @Override
        public void copy(DataInputView in, DataOutputView target) throws 
IOException {
                int len = in.readUnsignedByte();

http://git-wip-us.apache.org/repos/asf/flink/blob/e727355e/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java 
b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
new file mode 100644
index 0000000..76bdece
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class CopyableValueTest {
+
+       @Test
+       public void testCopy() {
+               CopyableValue<?>[] value_types = new CopyableValue[] {
+                       new BooleanValue(true),
+                       new ByteValue((byte) 42),
+                       new CharValue('q'),
+                       new DoubleValue(3.1415926535897932),
+                       new FloatValue((float) 3.14159265),
+                       new IntValue(42),
+                       new LongValue(42l),
+                       new NullValue(),
+                       new ShortValue((short) 42),
+                       new StringValue("QED")
+               };
+
+               for (CopyableValue<?> type : value_types) {
+                       assertEquals(type, type.copy());
+               }
+       }
+
+       @Test
+       public void testCopyTo() {
+               BooleanValue boolean_from = new BooleanValue(true);
+               BooleanValue boolean_to = new BooleanValue(false);
+
+               boolean_from.copyTo(boolean_to);
+               assertEquals(boolean_from, boolean_to);
+
+               ByteValue byte_from = new ByteValue((byte) 3);
+               ByteValue byte_to = new ByteValue((byte) 7);
+
+               byte_from.copyTo(byte_to);
+               assertEquals(byte_from, byte_to);
+
+               CharValue char_from = new CharValue('α');
+               CharValue char_to = new CharValue('ω');
+
+               char_from.copyTo(char_to);
+               assertEquals(char_from, char_to);
+
+               DoubleValue double_from = new DoubleValue(2.7182818284590451);
+               DoubleValue double_to = new DoubleValue(0);
+
+               double_from.copyTo(double_to);
+               assertEquals(double_from, double_to);
+
+               FloatValue float_from = new FloatValue((float) 2.71828182);
+               FloatValue float_to = new FloatValue((float) 1.41421356);
+
+               float_from.copyTo(float_to);
+               assertEquals(float_from, float_to);
+
+               IntValue int_from = new IntValue(8191);
+               IntValue int_to = new IntValue(131071);
+
+               int_from.copyTo(int_to);
+               assertEquals(int_from, int_to);
+
+               LongValue long_from = new LongValue(524287);
+               LongValue long_to = new LongValue(2147483647);
+
+               long_from.copyTo(long_to);
+               assertEquals(long_from, long_to);
+
+               NullValue null_from = new NullValue();
+               NullValue null_to = new NullValue();
+
+               null_from.copyTo(null_to);
+               assertEquals(null_from, null_to);
+
+               ShortValue short_from = new ShortValue((short) 31);
+               ShortValue short_to = new ShortValue((short) 127);
+
+               short_from.copyTo(short_to);
+               assertEquals(short_from, short_to);
+
+               StringValue string_from = new 
StringValue("2305843009213693951");
+               StringValue string_to = new 
StringValue("618970019642690137449562111");
+
+               string_from.copyTo(string_to);
+               assertEquals(string_from, string_to);
+       }
+}

Reply via email to