TAJO-1103: Insert clause of partitioned table loses some FetchImpls. (jinho)
Closes #186 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/de28c829 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/de28c829 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/de28c829 Branch: refs/heads/block_iteration Commit: de28c82942ae0fd2d06b45bcf629d660c30a9888 Parents: 2eba8aa Author: jhkim <[email protected]> Authored: Wed Oct 8 11:04:24 2014 +0900 Committer: jhkim <[email protected]> Committed: Wed Oct 8 11:04:24 2014 +0900 ---------------------------------------------------------------------- CHANGES | 5 +- .../java/org/apache/tajo/worker/FetchImpl.java | 16 ++-- .../apache/tajo/master/TestRepartitioner.java | 85 ++++++++++++++++++++ .../org/apache/tajo/worker/TestHistory.java | 9 ++- 4 files changed, 103 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/de28c829/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index de21976..bff9583 100644 --- a/CHANGES +++ b/CHANGES @@ -161,7 +161,10 @@ Release 0.9.0 - unreleased BUG FIXES - TAJO-1101: Broadcast join with a zero-length file table returns wrong result data.(Hyoungjun Kim) + TAJO-1103: Insert clause of partitioned table loses some FetchImpls. (jinho) + + TAJO-1101: Broadcast join with a zero-length file table returns wrong result data. + (Hyoungjun Kim) TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/de28c829/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index 964da5d..f411793 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -36,8 +36,6 @@ import java.util.List; * <code>FetchImpl</code> information to indicate the locations of intermediate data. */ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cloneable { - private TajoWorkerProtocol.FetchProto.Builder builder = null; - private QueryUnit.PullHost host; // The pull server host information private TajoWorkerProtocol.ShuffleType type; // hash or range partition method. private ExecutionBlockId executionBlockId; // The executionBlock id @@ -53,7 +51,6 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl private long length = -1; public FetchImpl() { - builder = TajoWorkerProtocol.FetchProto.newBuilder(); taskIds = new ArrayList<Integer>(); attemptIds = new ArrayList<Integer>(); } @@ -108,14 +105,14 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl @Override public int hashCode() { - return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams, hasNext, taskIds, attemptIds); + return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams, + hasNext, taskIds, attemptIds, offset, length); } @Override public TajoWorkerProtocol.FetchProto getProto() { - if (builder == null) { - builder = TajoWorkerProtocol.FetchProto.newBuilder(); - } + TajoWorkerProtocol.FetchProto.Builder builder = TajoWorkerProtocol.FetchProto.newBuilder(); + builder.setHost(host.getHost()); builder.setPort(host.getPort()); builder.setType(type); @@ -235,7 +232,6 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl public FetchImpl clone() throws CloneNotSupportedException { FetchImpl newFetchImpl = (FetchImpl) super.clone(); - newFetchImpl.builder = TajoWorkerProtocol.FetchProto.newBuilder(); newFetchImpl.host = host.clone(); newFetchImpl.type = type; newFetchImpl.executionBlockId = executionBlockId; @@ -273,6 +269,8 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl TUtil.checkEquals(name, fetch.name) && TUtil.checkEquals(rangeParams, fetch.rangeParams) && TUtil.checkEquals(taskIds, fetch.taskIds) && - TUtil.checkEquals(type, fetch.type); + TUtil.checkEquals(type, fetch.type) && + TUtil.checkEquals(offset, fetch.offset) && + TUtil.checkEquals(length, fetch.length); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/de28c829/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index f969a08..afa330e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -29,6 +29,7 @@ import org.apache.tajo.master.querymaster.QueryUnit; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.master.querymaster.Repartitioner; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.junit.Test; @@ -39,6 +40,7 @@ import java.util.*; import static junit.framework.Assert.assertEquals; import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType; import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; public class TestRepartitioner { @@ -403,6 +405,89 @@ public class TestRepartitioner { } } + @Test + public void testSplitIntermediatesWithUniqueHost() { + List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>(); + + int[] pageLengths = new int[20]; //195MB + for (int i = 0 ; i < pageLengths.length; i++) { + if (i < pageLengths.length - 1) { + pageLengths[i] = 10 * 1024 * 1024; + } else { + pageLengths[i] = 5 * 1024 * 1024; + } + } + + long expectedTotalLength = 0; + QueryUnit.PullHost pullHost = new QueryUnit.PullHost("host", 0); + + for (int i = 0; i < 20; i++) { + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + long offset = 0; + for (int j = 0; j < pageLengths.length; j++) { + pages.add(new Pair(offset, pageLengths[j])); + offset += pageLengths[j]; + expectedTotalLength += pageLengths[j]; + } + IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost); + interm.setPages(pages); + interm.setVolume(offset); + intermediateEntries.add(interm); + } + + long splitVolume = 128 * 1024 * 1024; + List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + splitVolume, 10 * 1024 * 1024); + assertEquals(32, fetches.size()); + + int expectedSize = 0; + Set<FetchImpl> fetchSet = TUtil.newHashSet(); + for(List<FetchImpl> list : fetches){ + expectedSize += list.size(); + fetchSet.addAll(list); + } + assertEquals(expectedSize, fetchSet.size()); + + + int index = 0; + int numZeroPosFetcher = 0; + long totalLength = 0; + Set<String> uniqPullHost = new HashSet<String>(); + + for (List<FetchImpl> eachFetchList: fetches) { + long length = 0; + for (FetchImpl eachFetch: eachFetchList) { + if (eachFetch.getOffset() == 0) { + numZeroPosFetcher++; + } + totalLength += eachFetch.getLength(); + length += eachFetch.getLength(); + uniqPullHost.add(eachFetch.getPullHost().toString()); + } + assertTrue(length + " should be smaller than splitVolume", length < splitVolume); + if (index < fetches.size() - 1) { + assertTrue(length + " should be great than 100MB" + fetches.size() + "," + index, length >= 100 * 1024 * 1024); + } + index++; + } + assertEquals(20, numZeroPosFetcher); + assertEquals(1, uniqPullHost.size()); + assertEquals(expectedTotalLength, totalLength); + } + + @Test + public void testFetchImpl() { + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + QueryUnit.PullHost pullHost = new QueryUnit.PullHost("localhost", 0); + + FetchImpl expected = new FetchImpl(pullHost, ShuffleType.SCATTERED_HASH_SHUFFLE, ebId, 1); + FetchImpl fetch2 = new FetchImpl(pullHost, ShuffleType.SCATTERED_HASH_SHUFFLE, ebId, 1); + assertEquals(expected, fetch2); + fetch2.setOffset(5); + fetch2.setLength(10); + assertNotEquals(expected, fetch2); + } + private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) { Set<FetchImpl> expectedURLs = Sets.newHashSet(); http://git-wip-us.apache.org/repos/asf/tajo/blob/de28c829/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java index 15ead84..3a85c14 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java @@ -76,8 +76,13 @@ public class TestHistory { TaskRunnerHistory history = histories.iterator().next(); assertEquals(Service.STATE.STOPPED, history.getState()); - - assertEquals(history, new TaskRunnerHistory(history.getProto())); + TaskRunnerHistory fromProto = new TaskRunnerHistory(history.getProto()); + assertEquals(history.getExecutionBlockId(), fromProto.getExecutionBlockId()); + assertEquals(history.getFinishTime(), fromProto.getFinishTime()); + assertEquals(history.getStartTime(), fromProto.getStartTime()); + assertEquals(history.getState(), fromProto.getState()); + assertEquals(history.getContainerId(), fromProto.getContainerId()); + assertEquals(history.getProto().getTaskHistoriesCount(), fromProto.getProto().getTaskHistoriesCount()); } @Test
