Repository: flink
Updated Branches:
  refs/heads/master 831349a61 -> 3126bf522


http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
new file mode 100644
index 0000000..0126d6b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the behavior of the {@link BarrierBuffer} with {@link 
BufferSpiller}.
+ */
+public class SpillingBarrierBufferTest extends BarrierBufferTestBase {
+
+       private static IOManager ioManager;
+
+       @BeforeClass
+       public static void setup() {
+               ioManager = new IOManagerAsync();
+       }
+
+       @AfterClass
+       public static void shutdownIOManager() {
+               ioManager.shutdown();
+       }
+
+       @After
+       public void checkNoTempFilesRemain() {
+               // validate that all temp files have been removed
+               for (File dir : ioManager.getSpillingDirectories()) {
+                       for (String file : dir.list()) {
+                               if (file != null && !(file.equals(".") || 
file.equals(".."))) {
+                                       fail("barrier buffer did not clean up 
temp files. remaining file: " + file);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public BarrierBuffer createBarrierHandler(InputGate gate) throws 
IOException{
+               return new BarrierBuffer(gate, new BufferSpiller(ioManager, 
PAGE_SIZE));
+       }
+
+       @Override
+       public void validateAlignmentBuffered(long actualBytesBuffered, 
BufferOrEvent... sequence) {
+               long expectedBuffered = 0;
+               for (BufferOrEvent boe : sequence) {
+                       if (boe.isBuffer()) {
+                               expectedBuffered += BufferSpiller.HEADER_SIZE + 
boe.getBuffer().getSize();
+                       }
+               }
+
+               assertEquals("Wrong alignment buffered bytes", 
actualBytesBuffered, expectedBuffered);
+       }
+}

Reply via email to