Repository: spark Updated Branches: refs/heads/master 1471ee7af -> 833eab2c9
[SPARK-21369][CORE] Don't use Scala Tuple2 in common/network-* ## What changes were proposed in this pull request? Remove all usages of Scala Tuple2 from common/network-* projects. Otherwise, Yarn users cannot use `spark.reducer.maxReqSizeShuffleToMem`. ## How was this patch tested? Jenkins. Author: Shixiong Zhu <shixi...@databricks.com> Closes #18593 from zsxwing/SPARK-21369. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/833eab2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/833eab2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/833eab2c Branch: refs/heads/master Commit: 833eab2c9bd273ee9577fbf9e480d3e3a4b7d203 Parents: 1471ee7 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Tue Jul 11 11:26:17 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Jul 11 11:26:17 2017 +0800 ---------------------------------------------------------------------- common/network-common/pom.xml | 3 ++- .../client/TransportResponseHandler.java | 20 ++++++++++---------- .../network/server/OneForOneStreamManager.java | 17 +++++------------ common/network-shuffle/pom.xml | 1 + common/network-yarn/pom.xml | 1 + 5 files changed, 19 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/pom.xml ---------------------------------------------------------------------- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 066970f..0254d0c 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -90,7 +90,8 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> - </dependency> + <scope>test</scope> + </dependency> <!-- This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 340b8b9..7a3d96c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -24,10 +24,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; -import scala.Tuple2; - import com.google.common.annotations.VisibleForTesting; import io.netty.channel.Channel; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +58,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { private final Map<Long, RpcResponseCallback> outstandingRpcs; - private final Queue<Tuple2<String, StreamCallback>> streamCallbacks; + private final Queue<Pair<String, StreamCallback>> streamCallbacks; private volatile boolean streamActive; /** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */ @@ -92,7 +92,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { public void addStreamCallback(String streamId, StreamCallback callback) { timeOfLastRequestNs.set(System.nanoTime()); - streamCallbacks.offer(new Tuple2<>(streamId, callback)); + streamCallbacks.offer(ImmutablePair.of(streamId, callback)); } @VisibleForTesting @@ -119,9 +119,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { logger.warn("RpcResponseCallback.onFailure throws exception", e); } } - for (Tuple2<String, StreamCallback> entry : streamCallbacks) { + for (Pair<String, StreamCallback> entry : streamCallbacks) { try { - entry._2().onFailure(entry._1(), cause); + entry.getValue().onFailure(entry.getKey(), cause); } catch (Exception e) { logger.warn("StreamCallback.onFailure throws exception", e); } @@ -208,9 +208,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { } } else if (message instanceof StreamResponse) { StreamResponse resp = (StreamResponse) message; - Tuple2<String, StreamCallback> entry = streamCallbacks.poll(); + Pair<String, StreamCallback> entry = streamCallbacks.poll(); if (entry != null) { - StreamCallback callback = entry._2(); + StreamCallback callback = entry.getValue(); if (resp.byteCount > 0) { StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount, callback); @@ -235,9 +235,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { } } else if (message instanceof StreamFailure) { StreamFailure resp = (StreamFailure) message; - Tuple2<String, StreamCallback> entry = streamCallbacks.poll(); + Pair<String, StreamCallback> entry = streamCallbacks.poll(); if (entry != null) { - StreamCallback callback = entry._2(); + StreamCallback callback = entry.getValue(); try { callback.onFailure(resp.streamId, new RuntimeException(resp.error)); } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index ad8e8b4..85ca2f1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -23,8 +23,6 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import scala.Tuple2; - import com.google.common.base.Preconditions; import io.netty.channel.Channel; import org.slf4j.Logger; @@ -98,21 +96,16 @@ public class OneForOneStreamManager extends StreamManager { @Override public ManagedBuffer openStream(String streamChunkId) { - Tuple2<Long, Integer> streamIdAndChunkId = parseStreamChunkId(streamChunkId); - return getChunk(streamIdAndChunkId._1, streamIdAndChunkId._2); - } - - public static String genStreamChunkId(long streamId, int chunkId) { - return String.format("%d_%d", streamId, chunkId); - } - - public static Tuple2<Long, Integer> parseStreamChunkId(String streamChunkId) { String[] array = streamChunkId.split("_"); assert array.length == 2: "Stream id and chunk index should be specified when open stream for fetching block."; long streamId = Long.valueOf(array[0]); int chunkIndex = Integer.valueOf(array[1]); - return new Tuple2<>(streamId, chunkIndex); + return getChunk(streamId, chunkIndex); + } + + public static String genStreamChunkId(long streamId, int chunkId) { + return String.format("%d_%d", streamId, chunkId); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-shuffle/pom.xml ---------------------------------------------------------------------- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 2de882a..9968480 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -69,6 +69,7 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> + <scope>test</scope> </dependency> <!-- http://git-wip-us.apache.org/repos/asf/spark/blob/833eab2c/common/network-yarn/pom.xml ---------------------------------------------------------------------- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 7e24315..ec2db6e 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -48,6 +48,7 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-tags_${scala.binary.version}</artifactId> + <scope>test</scope> </dependency> <!-- --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org