Repository: flink Updated Branches: refs/heads/master 6ddfcb402 -> 52d9806ba
[FLINK-1498] [tests] Better error messages in external sort tests Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52d9806b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52d9806b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52d9806b Branch: refs/heads/master Commit: 52d9806baaff1689f21962febb7dc73d68572289 Parents: 30a52a0 Author: Stephan Ewen <[email protected]> Authored: Mon Feb 9 17:44:42 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Feb 9 17:45:48 2015 +0100 ---------------------------------------------------------------------- .../operators/sort/ExternalSortITCase.java | 453 ++++++++++--------- .../sort/ExternalSortLargeRecordsITCase.java | 8 +- 2 files changed, 252 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/52d9806b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java index 6e35ad0..6a1eba7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java @@ -74,6 +74,8 @@ public class ExternalSortITCase { private TypeSerializerFactory<Record> pactRecordSerializer; private TypeComparator<Record> pactRecordComparator; + + private boolean failed; // -------------------------------------------------------------------------------------------- @@ -96,7 +98,7 @@ public class ExternalSortITCase { if (this.memoryManager != null) { Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", - this.memoryManager.verifyEmpty()); + failed || this.memoryManager.verifyEmpty()); this.memoryManager.shutdown(); this.memoryManager = null; } @@ -105,246 +107,281 @@ public class ExternalSortITCase { // -------------------------------------------------------------------------------------------- @Test - public void testInMemorySort() throws Exception { - // comparator - final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator(); - - final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL); - final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS); - - // merge iterator - LOG.debug("Initializing sortmerger..."); - - Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, - source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)64/78, 2, 0.9f); - - // emit data - LOG.debug("Reading and sorting data..."); - - // check order - MutableObjectIterator<Record> iterator = merger.getIterator(); - - LOG.debug("Checking results..."); - int pairsEmitted = 1; - - Record rec1 = new Record(); - Record rec2 = new Record(); - - Assert.assertTrue((rec1 = iterator.next(rec1)) != null); - while ((rec2 = iterator.next(rec2)) != null) { - final Key k1 = rec1.getField(0, TestData.Key.class); - final Key k2 = rec2.getField(0, TestData.Key.class); - pairsEmitted++; + public void testInMemorySort() { + try { + // comparator + final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator(); - Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); + final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL); + final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS); + + // merge iterator + LOG.debug("Initializing sortmerger..."); - Record tmp = rec1; - rec1 = rec2; - k1.setKey(k2.getKey()); + Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, + source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, + (double)64/78, 2, 0.9f); + + // emit data + LOG.debug("Reading and sorting data..."); + + // check order + MutableObjectIterator<Record> iterator = merger.getIterator(); + + LOG.debug("Checking results..."); + int pairsEmitted = 1; + + Record rec1 = new Record(); + Record rec2 = new Record(); - rec2 = tmp; + Assert.assertTrue((rec1 = iterator.next(rec1)) != null); + while ((rec2 = iterator.next(rec2)) != null) { + final Key k1 = rec1.getField(0, TestData.Key.class); + final Key k2 = rec2.getField(0, TestData.Key.class); + pairsEmitted++; + + Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); + + Record tmp = rec1; + rec1 = rec2; + k1.setKey(k2.getKey()); + + rec2 = tmp; + } + Assert.assertTrue(NUM_PAIRS == pairsEmitted); + + merger.close(); + } + catch (Exception e) { + failed = true; + e.printStackTrace(); + Assert.fail(e.getMessage()); } - Assert.assertTrue(NUM_PAIRS == pairsEmitted); - - merger.close(); } @Test - public void testInMemorySortUsing10Buffers() throws Exception { - // comparator - final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator(); - - final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL); - final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS); - - // merge iterator - LOG.debug("Initializing sortmerger..."); - - Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, - source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)64/78, 10, 2, 0.9f); - - // emit data - LOG.debug("Reading and sorting data..."); - - // check order - MutableObjectIterator<Record> iterator = merger.getIterator(); - - LOG.debug("Checking results..."); - int pairsEmitted = 1; - - Record rec1 = new Record(); - Record rec2 = new Record(); - - Assert.assertTrue((rec1 = iterator.next(rec1)) != null); - while ((rec2 = iterator.next(rec2)) != null) { - final Key k1 = rec1.getField(0, TestData.Key.class); - final Key k2 = rec2.getField(0, TestData.Key.class); - pairsEmitted++; + public void testInMemorySortUsing10Buffers() { + try { + // comparator + final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator(); + + final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL); + final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS); + + // merge iterator + LOG.debug("Initializing sortmerger..."); + + Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, + source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, + (double)64/78, 10, 2, 0.9f); + + // emit data + LOG.debug("Reading and sorting data..."); + + // check order + MutableObjectIterator<Record> iterator = merger.getIterator(); - Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); + LOG.debug("Checking results..."); + int pairsEmitted = 1; + + Record rec1 = new Record(); + Record rec2 = new Record(); - Record tmp = rec1; - rec1 = rec2; - k1.setKey(k2.getKey()); + Assert.assertTrue((rec1 = iterator.next(rec1)) != null); + while ((rec2 = iterator.next(rec2)) != null) { + final Key k1 = rec1.getField(0, TestData.Key.class); + final Key k2 = rec2.getField(0, TestData.Key.class); + pairsEmitted++; + + Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); + + Record tmp = rec1; + rec1 = rec2; + k1.setKey(k2.getKey()); + + rec2 = tmp; + } + Assert.assertTrue(NUM_PAIRS == pairsEmitted); - rec2 = tmp; + merger.close(); + } + catch (Exception e) { + failed = true; + e.printStackTrace(); + Assert.fail(e.getMessage()); } - Assert.assertTrue(NUM_PAIRS == pairsEmitted); - - merger.close(); } @Test - public void testSpillingSort() throws Exception { - // comparator - final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator(); - - final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL); - final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS); - - // merge iterator - LOG.debug("Initializing sortmerger..."); - - Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, - source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)16/78, 64, 0.7f); - - // emit data - LOG.debug("Reading and sorting data..."); - - // check order - MutableObjectIterator<Record> iterator = merger.getIterator(); - - LOG.debug("Checking results..."); - int pairsEmitted = 1; - - Record rec1 = new Record(); - Record rec2 = new Record(); - - Assert.assertTrue((rec1 = iterator.next(rec1)) != null); - while ((rec2 = iterator.next(rec2)) != null) { - final Key k1 = rec1.getField(0, TestData.Key.class); - final Key k2 = rec2.getField(0, TestData.Key.class); - pairsEmitted++; + public void testSpillingSort() { + try { + // comparator + final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator(); - Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); + final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL); + final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS); + + // merge iterator + LOG.debug("Initializing sortmerger..."); + + Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, + source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, + (double)16/78, 64, 0.7f); + + // emit data + LOG.debug("Reading and sorting data..."); + + // check order + MutableObjectIterator<Record> iterator = merger.getIterator(); + + LOG.debug("Checking results..."); + int pairsEmitted = 1; + + Record rec1 = new Record(); + Record rec2 = new Record(); - Record tmp = rec1; - rec1 = rec2; - k1.setKey(k2.getKey()); + Assert.assertTrue((rec1 = iterator.next(rec1)) != null); + while ((rec2 = iterator.next(rec2)) != null) { + final Key k1 = rec1.getField(0, TestData.Key.class); + final Key k2 = rec2.getField(0, TestData.Key.class); + pairsEmitted++; + + Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); + + Record tmp = rec1; + rec1 = rec2; + k1.setKey(k2.getKey()); + + rec2 = tmp; + } + Assert.assertTrue(NUM_PAIRS == pairsEmitted); - rec2 = tmp; + merger.close(); + } + catch (Exception e) { + failed = true; + e.printStackTrace(); + Assert.fail(e.getMessage()); } - Assert.assertTrue(NUM_PAIRS == pairsEmitted); - - merger.close(); } @Test - public void testSpillingSortWithIntermediateMerge() throws Exception { - // amount of pairs - final int PAIRS = 10000000; - - // comparator - final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator(); - - final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH); - final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, PAIRS); - - // merge iterator - LOG.debug("Initializing sortmerger..."); - - Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, - source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, - (double)64/78, 16, 0.7f); - - // emit data - LOG.debug("Emitting data..."); - - // check order - MutableObjectIterator<Record> iterator = merger.getIterator(); - - LOG.debug("Checking results..."); - int pairsRead = 1; - int nextStep = PAIRS / 20; - - Record rec1 = new Record(); - Record rec2 = new Record(); - - Assert.assertTrue((rec1 = iterator.next(rec1)) != null); - while ((rec2 = iterator.next(rec2)) != null) { - final Key k1 = rec1.getField(0, TestData.Key.class); - final Key k2 = rec2.getField(0, TestData.Key.class); - pairsRead++; + public void testSpillingSortWithIntermediateMerge() { + try { + // amount of pairs + final int PAIRS = 10000000; + + // comparator + final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator(); + + final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH); + final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, PAIRS); - Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); + // merge iterator + LOG.debug("Initializing sortmerger..."); - Record tmp = rec1; - rec1 = rec2; - k1.setKey(k2.getKey()); - rec2 = tmp; + Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, + source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator, + (double)64/78, 16, 0.7f); - // log - if (pairsRead == nextStep) { - nextStep += PAIRS / 20; - } + // emit data + LOG.debug("Emitting data..."); + + // check order + MutableObjectIterator<Record> iterator = merger.getIterator(); + LOG.debug("Checking results..."); + int pairsRead = 1; + int nextStep = PAIRS / 20; + + Record rec1 = new Record(); + Record rec2 = new Record(); + + Assert.assertTrue((rec1 = iterator.next(rec1)) != null); + while ((rec2 = iterator.next(rec2)) != null) { + final Key k1 = rec1.getField(0, TestData.Key.class); + final Key k2 = rec2.getField(0, TestData.Key.class); + pairsRead++; + + Assert.assertTrue(keyComparator.compare(k1, k2) <= 0); + + Record tmp = rec1; + rec1 = rec2; + k1.setKey(k2.getKey()); + rec2 = tmp; + + // log + if (pairsRead == nextStep) { + nextStep += PAIRS / 20; + } + + } + Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead); + merger.close(); + } + catch (Exception e) { + failed = true; + e.printStackTrace(); + Assert.fail(e.getMessage()); } - Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead); - merger.close(); } @Test - public void testSpillingSortWithIntermediateMergeIntPair() throws Exception { - // amount of pairs - final int PAIRS = 50000000; - - // comparator - final RandomIntPairGenerator generator = new RandomIntPairGenerator(12345678, PAIRS); - - final TypeSerializerFactory<IntPair> serializerFactory = new IntPairSerializer.IntPairSerializerFactory(); - final TypeComparator<IntPair> comparator = new IntPairComparator(); - - // merge iterator - LOG.debug("Initializing sortmerger..."); - - Sorter<IntPair> merger = new UnilateralSortMerger<IntPair>(this.memoryManager, this.ioManager, - generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f); - - // emit data - LOG.debug("Emitting data..."); - - // check order - MutableObjectIterator<IntPair> iterator = merger.getIterator(); - - LOG.debug("Checking results..."); - int pairsRead = 1; - int nextStep = PAIRS / 20; - - IntPair rec1 = new IntPair(); - IntPair rec2 = new IntPair(); - - Assert.assertTrue((rec1 = iterator.next(rec1)) != null); - - while ((rec2 = iterator.next(rec2)) != null) { - final int k1 = rec1.getKey(); - final int k2 = rec2.getKey(); - pairsRead++; + public void testSpillingSortWithIntermediateMergeIntPair() { + try { + // amount of pairs + final int PAIRS = 50000000; + + // comparator + final RandomIntPairGenerator generator = new RandomIntPairGenerator(12345678, PAIRS); + + final TypeSerializerFactory<IntPair> serializerFactory = new IntPairSerializer.IntPairSerializerFactory(); + final TypeComparator<IntPair> comparator = new IntPairComparator(); - Assert.assertTrue(k1 - k2 <= 0); + // merge iterator + LOG.debug("Initializing sortmerger..."); - IntPair tmp = rec1; - rec1 = rec2; - rec2 = tmp; + Sorter<IntPair> merger = new UnilateralSortMerger<IntPair>(this.memoryManager, this.ioManager, + generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f); + + // emit data + LOG.debug("Emitting data..."); - // log - if (pairsRead == nextStep) { - nextStep += PAIRS / 20; + // check order + MutableObjectIterator<IntPair> iterator = merger.getIterator(); + + LOG.debug("Checking results..."); + int pairsRead = 1; + int nextStep = PAIRS / 20; + + IntPair rec1 = new IntPair(); + IntPair rec2 = new IntPair(); + + Assert.assertTrue((rec1 = iterator.next(rec1)) != null); + + while ((rec2 = iterator.next(rec2)) != null) { + final int k1 = rec1.getKey(); + final int k2 = rec2.getKey(); + pairsRead++; + + Assert.assertTrue(k1 - k2 <= 0); + + IntPair tmp = rec1; + rec1 = rec2; + rec2 = tmp; + + // log + if (pairsRead == nextStep) { + nextStep += PAIRS / 20; + } } + Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead); + merger.close(); + } + catch (Exception e) { + failed = true; + e.printStackTrace(); + Assert.fail(e.getMessage()); } - Assert.assertEquals("Not all pairs were read back in.", PAIRS, pairsRead); - merger.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/52d9806b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index 7403ab0..af7b008 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -58,6 +58,8 @@ public class ExternalSortLargeRecordsITCase { private IOManager ioManager; private MemoryManager memoryManager; + + private boolean errored; // -------------------------------------------------------------------------------------------- @@ -76,7 +78,7 @@ public class ExternalSortLargeRecordsITCase { if (this.memoryManager != null) { Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", - this.memoryManager.verifyEmpty()); + errored || this.memoryManager.verifyEmpty()); this.memoryManager.shutdown(); this.memoryManager = null; } @@ -147,6 +149,7 @@ public class ExternalSortLargeRecordsITCase { sorter.close(); } catch (Exception e) { + errored = true; e.printStackTrace(); fail(e.getMessage()); } @@ -216,6 +219,7 @@ public class ExternalSortLargeRecordsITCase { sorter.close(); } catch (Exception e) { + errored = true; e.printStackTrace(); fail(e.getMessage()); } @@ -300,6 +304,7 @@ public class ExternalSortLargeRecordsITCase { sorter.close(); } catch (Exception e) { + errored = true; e.printStackTrace(); fail(e.getMessage()); } @@ -370,6 +375,7 @@ public class ExternalSortLargeRecordsITCase { sorter.close(); } catch (Exception e) { + errored = true; e.printStackTrace(); fail(e.getMessage()); }
