This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new f8a2140  [FLINK-11915][core] Fix miscalculation of 
DataInputViewStream.skip
f8a2140 is described below

commit f8a214008089d62e3bd12db857cd059367b54dd1
Author: ifndef-SleePy <mmyy1...@gmail.com>
AuthorDate: Wed Apr 17 15:21:53 2019 +0800

    [FLINK-11915][core] Fix miscalculation of DataInputViewStream.skip
    
    [FLINK-11915][core] Add the missed unit test case
    
    [FLINK-11915][core] Rich the test case to cover origin skipping function
    
    This closes #8195.
---
 .../typeutils/runtime/DataInputViewStream.java     | 10 +--
 .../typeutils/runtime/DataInputViewStreamTest.java | 86 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 5 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
index aab4689..f6073ed 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStream.java
@@ -52,17 +52,17 @@ public class DataInputViewStream extends InputStream {
 
        @Override
        public long skip(long n) throws IOException {
-               long counter = n;
-               while(counter > Integer.MAX_VALUE) {
+               long toSkipRemaining = n;
+               while(toSkipRemaining > Integer.MAX_VALUE) {
                        int skippedBytes = 
inputView.skipBytes(Integer.MAX_VALUE);
 
                        if (skippedBytes == 0) {
-                               return n - counter;
+                               return n - toSkipRemaining;
                        }
 
-                       counter -= skippedBytes;
+                       toSkipRemaining -= skippedBytes;
                }
-               return n - counter - inputView.skipBytes((int) counter);
+               return n - (toSkipRemaining - inputView.skipBytes((int) 
toSkipRemaining));
        }
 
        @Override
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStreamTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStreamTest.java
new file mode 100644
index 0000000..885fa56
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/DataInputViewStreamTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link DataInputViewStream}.
+ */
+public class DataInputViewStreamTest extends TestLogger {
+
+       @Test
+       public void testSkip() throws IOException {
+               final TestInputStream inputStream = new TestInputStream();
+               try (TestDataInputView dataInputView = new 
TestDataInputView(inputStream)) {
+                       try (DataInputViewStream dataInputViewStream = new 
DataInputViewStream(dataInputView)) {
+                               assertEquals(1, dataInputViewStream.skip(1));
+                               assertEquals(1, inputStream.skipped);
+
+                               final long bigNumberToSkip = 1024L + 2L * 
Integer.MAX_VALUE;
+                               assertEquals(bigNumberToSkip, 
dataInputViewStream.skip(bigNumberToSkip));
+                               assertEquals(1 + bigNumberToSkip, 
inputStream.skipped);
+                       }
+               }
+
+       }
+
+       /**
+        * Test implementation of {@link DataInputView}.
+        */
+       private static class TestDataInputView extends DataInputStream 
implements DataInputView {
+
+               TestDataInputView(InputStream in) {
+                       super(in);
+               }
+
+               @Override
+               public void skipBytesToRead(int numBytes) throws IOException {
+                       throw new UnsupportedOperationException("Not properly 
implemented.");
+               }
+       }
+
+       /**
+        * Test implementation of {@link InputStream}.
+        */
+       private static class TestInputStream extends InputStream {
+
+               long skipped = 0;
+
+               @Override
+               public int read() throws IOException {
+                       return 0;
+               }
+
+               @Override
+               public long skip(long n) {
+                       skipped += n;
+                       return n;
+               }
+       }
+}

Reply via email to