[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support - Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings
For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen <so...@cloudera.com> Closes #16871 from srowen/SPARK-19493. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e240549 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e240549 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e240549 Branch: refs/heads/master Commit: 0e2405490f2056728d1353abbac6f3ea177ae533 Parents: 3871d94 Author: Sean Owen <so...@cloudera.com> Authored: Thu Feb 16 12:32:45 2017 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Thu Feb 16 12:32:45 2017 +0000 ---------------------------------------------------------------------- assembly/pom.xml | 1 + build/mvn | 8 +- build/sbt-launch-lib.bash | 2 +- .../spark/network/client/TransportClient.java | 111 +- .../network/crypto/AuthClientBootstrap.java | 16 +- .../spark/network/crypto/AuthRpcHandler.java | 3 - .../network/server/TransportRequestHandler.java | 27 +- .../spark/network/crypto/AuthEngineSuite.java | 2 - .../shuffle/ExternalShuffleBlockHandler.java | 8 +- .../shuffle/ExternalShuffleBlockResolver.java | 7 +- .../network/shuffle/ExternalShuffleClient.java | 21 +- .../network/shuffle/RetryingBlockFetcher.java | 9 +- common/sketch/pom.xml | 2 + common/unsafe/pom.xml | 2 + .../java/org/apache/spark/unsafe/Platform.java | 9 +- .../spark/unsafe/types/CalendarInterval.java | 88 +- .../org/apache/spark/api/java/Optional.java | 7 +- .../api/java/function/CoGroupFunction.java | 1 + .../java/function/DoubleFlatMapFunction.java | 1 + .../spark/api/java/function/DoubleFunction.java | 1 + .../spark/api/java/function/FilterFunction.java | 1 + .../api/java/function/FlatMapFunction.java | 1 + .../api/java/function/FlatMapFunction2.java | 1 + .../java/function/FlatMapGroupsFunction.java | 1 + .../api/java/function/ForeachFunction.java | 1 + .../java/function/ForeachPartitionFunction.java | 1 + .../spark/api/java/function/Function.java | 1 + .../spark/api/java/function/Function0.java | 1 + .../spark/api/java/function/Function2.java | 1 + .../spark/api/java/function/Function3.java | 1 + .../spark/api/java/function/Function4.java | 1 + .../spark/api/java/function/MapFunction.java | 1 + .../api/java/function/MapGroupsFunction.java | 1 + .../java/function/MapPartitionsFunction.java | 1 + .../api/java/function/PairFlatMapFunction.java | 1 + .../spark/api/java/function/PairFunction.java | 1 + .../spark/api/java/function/ReduceFunction.java | 1 + .../spark/api/java/function/VoidFunction.java | 1 + .../spark/api/java/function/VoidFunction2.java | 1 + .../unsafe/sort/UnsafeExternalSorter.java | 9 +- .../unsafe/sort/UnsafeSorterSpillMerger.java | 28 +- .../scala/org/apache/spark/SparkContext.scala | 3 - .../spark/launcher/WorkerCommandBuilder.scala | 1 - .../scala/org/apache/spark/util/Utils.scala | 44 +- .../java/org/apache/spark/JavaAPISuite.java | 1836 ---------------- .../test/org/apache/spark/Java8RDDAPISuite.java | 356 ++++ .../test/org/apache/spark/JavaAPISuite.java | 1842 ++++++++++++++++ .../org/apache/spark/util/UtilsSuite.scala | 6 +- dev/appveyor-install-dependencies.ps1 | 2 +- dev/create-release/release-build.sh | 1 - dev/make-distribution.sh | 2 +- dev/mima | 1 - dev/run-tests.py | 3 - dev/test-dependencies.sh | 2 +- docs/building-spark.md | 32 +- docs/index.md | 6 +- docs/mllib-linear-methods.md | 2 +- docs/mllib-statistics.md | 7 +- docs/programming-guide.md | 11 +- docs/quick-start.md | 9 +- docs/streaming-custom-receivers.md | 10 +- docs/streaming-kafka-0-10-integration.md | 62 +- docs/streaming-kafka-0-8-integration.md | 41 +- docs/streaming-programming-guide.md | 219 +- docs/structured-streaming-programming-guide.md | 38 +- .../spark/examples/ml/JavaTokenizerExample.java | 4 +- .../examples/sql/JavaSQLDataSourceExample.java | 2 +- external/java8-tests/README.md | 22 - external/java8-tests/pom.xml | 132 -- .../apache/spark/java8/Java8RDDAPISuite.java | 356 ---- .../spark/java8/dstream/Java8APISuite.java | 882 -------- .../java8/sql/Java8DatasetAggregatorSuite.java | 62 - .../src/test/resources/log4j.properties | 27 - .../org/apache/spark/java8/JDK8ScalaSuite.scala | 30 - .../apache/spark/sql/kafka010/KafkaSource.scala | 3 +- .../spark/streaming/kafka010/KafkaRDD.scala | 7 +- .../spark/launcher/AbstractCommandBuilder.java | 7 +- .../spark/launcher/ChildProcAppHandle.java | 10 +- .../spark/launcher/CommandBuilderUtils.java | 21 - .../apache/spark/launcher/LauncherServer.java | 7 +- .../apache/spark/launcher/OutputRedirector.java | 7 +- .../apache/spark/launcher/SparkAppHandle.java | 3 - .../launcher/SparkClassCommandBuilder.java | 68 +- .../launcher/SparkSubmitCommandBuilder.java | 101 +- .../launcher/CommandBuilderUtilsSuite.java | 36 - .../SparkSubmitCommandBuilderSuite.java | 8 +- launcher/src/test/resources/spark-defaults.conf | 2 +- pom.xml | 171 +- project/SparkBuild.scala | 41 +- .../org/apache/spark/deploy/yarn/Client.scala | 1 - .../spark/deploy/yarn/ExecutorRunnable.scala | 2 - .../launcher/YarnCommandBuilderUtils.scala | 12 - .../org/apache/spark/sql/types/Decimal.scala | 8 +- .../FlatMapGroupsWithStateFunction.java | 4 +- .../function/MapGroupsWithStateFunction.java | 4 +- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/KeyValueGroupedDataset.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 11 - .../org/apache/spark/sql/SparkSession.scala | 11 - .../spark/sql/Java8DatasetAggregatorSuite.java | 61 + .../org/apache/spark/sql/JavaDatasetSuite.java | 2 +- sql/hive/pom.xml | 3 +- .../execution/ScriptTransformationExec.scala | 10 +- .../apache/spark/streaming/JavaAPISuite.java | 2000 ----------------- .../apache/spark/streaming/Java8APISuite.java | 887 ++++++++ .../apache/spark/streaming/JavaAPISuite.java | 2008 ++++++++++++++++++ 106 files changed, 5641 insertions(+), 6314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/assembly/pom.xml ---------------------------------------------------------------------- diff --git a/assembly/pom.xml b/assembly/pom.xml index 53f1879..9d8607d 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -187,6 +187,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> <executions> <execution> <id>dist</id> http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/build/mvn ---------------------------------------------------------------------- diff --git a/build/mvn b/build/mvn index 866bad8..1e393c3 100755 --- a/build/mvn +++ b/build/mvn @@ -22,7 +22,7 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" # Preserve the calling directory _CALLING_DIR="$(pwd)" # Options used during compilation -_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" +_COMPILE_JVM_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" # Installs any application tarball given a URL, the expected tarball name, # and, optionally, a checkable binary path to determine if the binary has @@ -141,13 +141,9 @@ cd "${_CALLING_DIR}" # Now that zinc is ensured to be installed, check its status and, if its # not running or just installed, start it if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then - ZINC_JAVA_HOME= - if [ -n "$JAVA_7_HOME" ]; then - ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME" - fi export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} "${ZINC_BIN}" -shutdown -port ${ZINC_PORT} - $ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \ + "${ZINC_BIN}" -start -port ${ZINC_PORT} \ -scala-compiler "${SCALA_COMPILER}" \ -scala-library "${SCALA_LIBRARY}" &>/dev/null fi http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/build/sbt-launch-lib.bash ---------------------------------------------------------------------- diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash index 615f848..4732669 100755 --- a/build/sbt-launch-lib.bash +++ b/build/sbt-launch-lib.bash @@ -117,7 +117,7 @@ get_mem_opts () { (( $perm < 4096 )) || perm=4096 local codecache=$(( $perm / 2 )) - echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m" + echo "-Xms${mem}m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m" } require_arg () { http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 7e7d78d..a6f527c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -32,8 +32,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,40 +131,36 @@ public class TransportClient implements Closeable { */ public void fetchChunk( long streamId, - final int chunkIndex, - final ChunkReceivedCallback callback) { - final long startTime = System.currentTimeMillis(); + int chunkIndex, + ChunkReceivedCallback callback) { + long startTime = System.currentTimeMillis(); if (logger.isDebugEnabled()) { logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel)); } - final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); + StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); handler.addFetchRequest(streamChunkId, callback); - channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - long timeTaken = System.currentTimeMillis() - startTime; - if (logger.isTraceEnabled()) { - logger.trace("Sending request {} to {} took {} ms", streamChunkId, - getRemoteAddress(channel), timeTaken); - } - } else { - String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId, - getRemoteAddress(channel), future.cause()); - logger.error(errorMsg, future.cause()); - handler.removeFetchRequest(streamChunkId); - channel.close(); - try { - callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause())); - } catch (Exception e) { - logger.error("Uncaught exception in RPC response callback handler!", e); - } - } + channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> { + if (future.isSuccess()) { + long timeTaken = System.currentTimeMillis() - startTime; + if (logger.isTraceEnabled()) { + logger.trace("Sending request {} to {} took {} ms", streamChunkId, + getRemoteAddress(channel), timeTaken); } - }); + } else { + String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId, + getRemoteAddress(channel), future.cause()); + logger.error(errorMsg, future.cause()); + handler.removeFetchRequest(streamChunkId); + channel.close(); + try { + callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause())); + } catch (Exception e) { + logger.error("Uncaught exception in RPC response callback handler!", e); + } + } + }); } /** @@ -175,8 +169,8 @@ public class TransportClient implements Closeable { * @param streamId The stream to fetch. * @param callback Object to call with the stream data. */ - public void stream(final String streamId, final StreamCallback callback) { - final long startTime = System.currentTimeMillis(); + public void stream(String streamId, StreamCallback callback) { + long startTime = System.currentTimeMillis(); if (logger.isDebugEnabled()) { logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel)); } @@ -186,29 +180,25 @@ public class TransportClient implements Closeable { // when responses arrive. synchronized (this) { handler.addStreamCallback(callback); - channel.writeAndFlush(new StreamRequest(streamId)).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - long timeTaken = System.currentTimeMillis() - startTime; - if (logger.isTraceEnabled()) { - logger.trace("Sending request for {} to {} took {} ms", streamId, - getRemoteAddress(channel), timeTaken); - } - } else { - String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId, - getRemoteAddress(channel), future.cause()); - logger.error(errorMsg, future.cause()); - channel.close(); - try { - callback.onFailure(streamId, new IOException(errorMsg, future.cause())); - } catch (Exception e) { - logger.error("Uncaught exception in RPC response callback handler!", e); - } - } + channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> { + if (future.isSuccess()) { + long timeTaken = System.currentTimeMillis() - startTime; + if (logger.isTraceEnabled()) { + logger.trace("Sending request for {} to {} took {} ms", streamId, + getRemoteAddress(channel), timeTaken); } - }); + } else { + String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId, + getRemoteAddress(channel), future.cause()); + logger.error(errorMsg, future.cause()); + channel.close(); + try { + callback.onFailure(streamId, new IOException(errorMsg, future.cause())); + } catch (Exception e) { + logger.error("Uncaught exception in RPC response callback handler!", e); + } + } + }); } } @@ -220,19 +210,17 @@ public class TransportClient implements Closeable { * @param callback Callback to handle the RPC's reply. * @return The RPC's id. */ - public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) { - final long startTime = System.currentTimeMillis(); + public long sendRpc(ByteBuffer message, RpcResponseCallback callback) { + long startTime = System.currentTimeMillis(); if (logger.isTraceEnabled()) { logger.trace("Sending RPC to {}", getRemoteAddress(channel)); } - final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); + long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); handler.addRpcRequest(requestId, callback); - channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { + channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))) + .addListener(future -> { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; if (logger.isTraceEnabled()) { @@ -251,8 +239,7 @@ public class TransportClient implements Closeable { logger.error("Uncaught exception in RPC response callback handler!", e); } } - } - }); + }); return requestId; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java index 980525d..799f454 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java @@ -20,12 +20,7 @@ package org.apache.spark.network.crypto; import java.io.IOException; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; -import java.security.Key; -import javax.crypto.KeyGenerator; -import javax.crypto.Mac; -import static java.nio.charset.StandardCharsets.UTF_8; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -37,7 +32,6 @@ import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportClientBootstrap; import org.apache.spark.network.sasl.SaslClientBootstrap; import org.apache.spark.network.sasl.SecretKeyHolder; -import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; /** @@ -103,20 +97,18 @@ public class AuthClientBootstrap implements TransportClientBootstrap { private void doSparkAuth(TransportClient client, Channel channel) throws GeneralSecurityException, IOException { - AuthEngine engine = new AuthEngine(authUser, secretKeyHolder.getSecretKey(authUser), conf); - try { + String secretKey = secretKeyHolder.getSecretKey(authUser); + try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) { ClientChallenge challenge = engine.challenge(); ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength()); challenge.encode(challengeData); - ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(), - conf.authRTTimeoutMs()); + ByteBuffer responseData = + client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs()); ServerResponse response = ServerResponse.decodeMessage(responseData); engine.validate(response); engine.sessionCipher().addToChannel(channel); - } finally { - engine.close(); } } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java index 991d8ba..0a5c029 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java @@ -17,9 +17,7 @@ package org.apache.spark.network.crypto; -import java.io.IOException; import java.nio.ByteBuffer; -import javax.security.sasl.Sasl; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; @@ -35,7 +33,6 @@ import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.sasl.SaslRpcHandler; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; -import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.TransportConf; /** http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 900e8eb..8193bc1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -22,8 +22,6 @@ import java.nio.ByteBuffer; import com.google.common.base.Throwables; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -189,21 +187,16 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { * Responds to a single message with some Encodable object. If a failure occurs while sending, * it will be logged and the channel closed. */ - private void respond(final Encodable result) { - final SocketAddress remoteAddress = channel.remoteAddress(); - channel.writeAndFlush(result).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - logger.trace("Sent result {} to client {}", result, remoteAddress); - } else { - logger.error(String.format("Error sending result %s to %s; closing connection", - result, remoteAddress), future.cause()); - channel.close(); - } - } + private void respond(Encodable result) { + SocketAddress remoteAddress = channel.remoteAddress(); + channel.writeAndFlush(result).addListener(future -> { + if (future.isSuccess()) { + logger.trace("Sent result {} to client {}", result, remoteAddress); + } else { + logger.error(String.format("Error sending result %s to %s; closing connection", + result, remoteAddress), future.cause()); + channel.close(); } - ); + }); } } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java index 9a186f2..a3519fe 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java @@ -18,10 +18,8 @@ package org.apache.spark.network.crypto; import java.util.Arrays; -import java.util.Map; import static java.nio.charset.StandardCharsets.UTF_8; -import com.google.common.collect.ImmutableMap; import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*; http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 6e02430..6daf960 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -190,12 +190,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler { allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); allMetrics.put("blockTransferRateBytes", blockTransferRateBytes); - allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() { - @Override - public Integer getValue() { - return blockManager.getRegisteredExecutorsSize(); - } - }); + allMetrics.put("registeredExecutorsSize", + (Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize()); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 25e9abd..62d58ab 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -205,12 +205,7 @@ public class ExternalShuffleBlockResolver { logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length); // Execute the actual deletion in a different thread, as it may take some time. - directoryCleaner.execute(new Runnable() { - @Override - public void run() { - deleteExecutorDirs(executor.localDirs); - } - }); + directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs)); } } } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 8c0c400..2c5827b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -82,23 +82,19 @@ public class ExternalShuffleClient extends ShuffleClient { @Override public void fetchBlocks( - final String host, - final int port, - final String execId, + String host, + int port, + String execId, String[] blockIds, BlockFetchingListener listener) { checkInit(); logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); try { RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = - new RetryingBlockFetcher.BlockFetchStarter() { - @Override - public void createAndStart(String[] blockIds, BlockFetchingListener listener) - throws IOException, InterruptedException { + (blockIds1, listener1) -> { TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start(); - } - }; + new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start(); + }; int maxRetries = conf.maxIORetries(); if (maxRetries > 0) { @@ -131,12 +127,9 @@ public class ExternalShuffleClient extends ShuffleClient { String execId, ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException { checkInit(); - TransportClient client = clientFactory.createUnmanagedClient(host, port); - try { + try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) { ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer(); client.sendRpcSync(registerMessage, 5000 /* timeoutMs */); - } finally { - client.close(); } } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index 5be8550..f309dda 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -164,12 +164,9 @@ public class RetryingBlockFetcher { logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms", retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime); - executorService.submit(new Runnable() { - @Override - public void run() { - Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS); - fetchAllOutstanding(); - } + executorService.submit(() -> { + Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS); + fetchAllOutstanding(); }); } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/sketch/pom.xml ---------------------------------------------------------------------- diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index bcd26d4..1356c47 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -61,6 +61,7 @@ <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> + <version>3.2.2</version> <configuration> <javacArgs combine.children="append"> <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> @@ -71,6 +72,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> <configuration> <compilerArgs combine.children="append"> <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/unsafe/pom.xml ---------------------------------------------------------------------- diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index dc19f4a..f03a4da 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -98,6 +98,7 @@ <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> + <version>3.2.2</version> <configuration> <javacArgs combine.children="append"> <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> @@ -108,6 +109,7 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> <configuration> <compilerArgs combine.children="append"> <!-- This option is needed to suppress warnings from sun.misc.Unsafe usage --> http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java ---------------------------------------------------------------------- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 671b8c7..f13c24a 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -162,14 +162,9 @@ public final class Platform { constructor.setAccessible(true); Field cleanerField = cls.getDeclaredField("cleaner"); cleanerField.setAccessible(true); - final long memory = allocateMemory(size); + long memory = allocateMemory(size); ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size); - Cleaner cleaner = Cleaner.create(buffer, new Runnable() { - @Override - public void run() { - freeMemory(memory); - } - }); + Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory)); cleanerField.set(buffer, cleaner); return buffer; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java ---------------------------------------------------------------------- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index fd6e95c..621f2c6 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -178,48 +178,52 @@ public final class CalendarInterval implements Serializable { "Interval string does not match day-time format of 'd h:m:s.n': " + s); } else { try { - if (unit.equals("year")) { - int year = (int) toLongWithRange("year", m.group(1), - Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12); - result = new CalendarInterval(year * 12, 0L); - - } else if (unit.equals("month")) { - int month = (int) toLongWithRange("month", m.group(1), - Integer.MIN_VALUE, Integer.MAX_VALUE); - result = new CalendarInterval(month, 0L); - - } else if (unit.equals("week")) { - long week = toLongWithRange("week", m.group(1), - Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK); - result = new CalendarInterval(0, week * MICROS_PER_WEEK); - - } else if (unit.equals("day")) { - long day = toLongWithRange("day", m.group(1), - Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY); - result = new CalendarInterval(0, day * MICROS_PER_DAY); - - } else if (unit.equals("hour")) { - long hour = toLongWithRange("hour", m.group(1), - Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR); - result = new CalendarInterval(0, hour * MICROS_PER_HOUR); - - } else if (unit.equals("minute")) { - long minute = toLongWithRange("minute", m.group(1), - Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE); - result = new CalendarInterval(0, minute * MICROS_PER_MINUTE); - - } else if (unit.equals("second")) { - long micros = parseSecondNano(m.group(1)); - result = new CalendarInterval(0, micros); - - } else if (unit.equals("millisecond")) { - long millisecond = toLongWithRange("millisecond", m.group(1), - Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI); - result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI); - - } else if (unit.equals("microsecond")) { - long micros = Long.parseLong(m.group(1)); - result = new CalendarInterval(0, micros); + switch (unit) { + case "year": + int year = (int) toLongWithRange("year", m.group(1), + Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12); + result = new CalendarInterval(year * 12, 0L); + break; + case "month": + int month = (int) toLongWithRange("month", m.group(1), + Integer.MIN_VALUE, Integer.MAX_VALUE); + result = new CalendarInterval(month, 0L); + break; + case "week": + long week = toLongWithRange("week", m.group(1), + Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK); + result = new CalendarInterval(0, week * MICROS_PER_WEEK); + break; + case "day": + long day = toLongWithRange("day", m.group(1), + Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY); + result = new CalendarInterval(0, day * MICROS_PER_DAY); + break; + case "hour": + long hour = toLongWithRange("hour", m.group(1), + Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR); + result = new CalendarInterval(0, hour * MICROS_PER_HOUR); + break; + case "minute": + long minute = toLongWithRange("minute", m.group(1), + Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE); + result = new CalendarInterval(0, minute * MICROS_PER_MINUTE); + break; + case "second": { + long micros = parseSecondNano(m.group(1)); + result = new CalendarInterval(0, micros); + break; + } + case "millisecond": + long millisecond = toLongWithRange("millisecond", m.group(1), + Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI); + result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI); + break; + case "microsecond": { + long micros = Long.parseLong(m.group(1)); + result = new CalendarInterval(0, micros); + break; + } } } catch (Exception e) { throw new IllegalArgumentException("Error parsing interval string: " + e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/Optional.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java index ca7babc..fd0f495 100644 --- a/core/src/main/java/org/apache/spark/api/java/Optional.java +++ b/core/src/main/java/org/apache/spark/api/java/Optional.java @@ -18,6 +18,7 @@ package org.apache.spark.api.java; import java.io.Serializable; +import java.util.Objects; import com.google.common.base.Preconditions; @@ -52,8 +53,8 @@ import com.google.common.base.Preconditions; * <li>{@link #isPresent()}</li> * </ul> * - * <p>{@code java.util.Optional} itself is not used at this time because the - * project does not require Java 8. Using {@code com.google.common.base.Optional} + * <p>{@code java.util.Optional} itself was not used because at the time, the + * project did not require Java 8. Using {@code com.google.common.base.Optional} * has in the past caused serious library version conflicts with Guava that can't * be resolved by shading. Hence this work-alike clone.</p> * @@ -171,7 +172,7 @@ public final class Optional<T> implements Serializable { return false; } Optional<?> other = (Optional<?>) obj; - return value == null ? other.value == null : value.equals(other.value); + return Objects.equals(value, other.value); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java index 07aebb7..33bedf7 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java @@ -24,6 +24,7 @@ import java.util.Iterator; * A function that returns zero or more output records from each grouping key and its values from 2 * Datasets. */ +@FunctionalInterface public interface CoGroupFunction<K, V1, V2, R> extends Serializable { Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java index 576087b..2f23da5 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java @@ -23,6 +23,7 @@ import java.util.Iterator; /** * A function that returns zero or more records of type Double from each input record. */ +@FunctionalInterface public interface DoubleFlatMapFunction<T> extends Serializable { Iterator<Double> call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java index bf16f79..3c0291c 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java @@ -22,6 +22,7 @@ import java.io.Serializable; /** * A function that returns Doubles, and can be used to construct DoubleRDDs. */ +@FunctionalInterface public interface DoubleFunction<T> extends Serializable { double call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java index 462ca3f..a6f69f7 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java @@ -24,6 +24,7 @@ import java.io.Serializable; * * If the function returns true, the element is included in the returned Dataset. */ +@FunctionalInterface public interface FilterFunction<T> extends Serializable { boolean call(T value) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java index 2d8ea6d..91d6129 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java @@ -23,6 +23,7 @@ import java.util.Iterator; /** * A function that returns zero or more output records from each input record. */ +@FunctionalInterface public interface FlatMapFunction<T, R> extends Serializable { Iterator<R> call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java index fc97b63..f9f2580 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java @@ -23,6 +23,7 @@ import java.util.Iterator; /** * A function that takes two inputs and returns zero or more output records. */ +@FunctionalInterface public interface FlatMapFunction2<T1, T2, R> extends Serializable { Iterator<R> call(T1 t1, T2 t2) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java index bae574a..6423c5d 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java @@ -23,6 +23,7 @@ import java.util.Iterator; /** * A function that returns zero or more output records from each grouping key and its values. */ +@FunctionalInterface public interface FlatMapGroupsFunction<K, V, R> extends Serializable { Iterator<R> call(K key, Iterator<V> values) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java index 07e54b2..2e6e908 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java @@ -24,6 +24,7 @@ import java.io.Serializable; * * Spark will invoke the call function on each element in the input Dataset. */ +@FunctionalInterface public interface ForeachFunction<T> extends Serializable { void call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java index 4938a51..d8f55d0 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java @@ -23,6 +23,7 @@ import java.util.Iterator; /** * Base interface for a function used in Dataset's foreachPartition function. */ +@FunctionalInterface public interface ForeachPartitionFunction<T> extends Serializable { void call(Iterator<T> t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java index b9d9777..8b2bbd5 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java @@ -24,6 +24,7 @@ import java.io.Serializable; * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed * when mapping RDDs of other types. */ +@FunctionalInterface public interface Function<T1, R> extends Serializable { R call(T1 v1) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function0.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function0.java b/core/src/main/java/org/apache/spark/api/java/function/Function0.java index c86928d..5c649d9 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function0.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function0.java @@ -22,6 +22,7 @@ import java.io.Serializable; /** * A zero-argument function that returns an R. */ +@FunctionalInterface public interface Function0<R> extends Serializable { R call() throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function2.java b/core/src/main/java/org/apache/spark/api/java/function/Function2.java index a975ce3..a7d9647 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java @@ -22,6 +22,7 @@ import java.io.Serializable; /** * A two-argument function that takes arguments of type T1 and T2 and returns an R. */ +@FunctionalInterface public interface Function2<T1, T2, R> extends Serializable { R call(T1 v1, T2 v2) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/core/src/main/java/org/apache/spark/api/java/function/Function3.java index 6eecfb6..77acd21 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function3.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java @@ -22,6 +22,7 @@ import java.io.Serializable; /** * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R. */ +@FunctionalInterface public interface Function3<T1, T2, T3, R> extends Serializable { R call(T1 v1, T2 v2, T3 v3) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/Function4.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function4.java b/core/src/main/java/org/apache/spark/api/java/function/Function4.java index 9c35a22..d530ba4 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java @@ -22,6 +22,7 @@ import java.io.Serializable; /** * A four-argument function that takes arguments of type T1, T2, T3 and T4 and returns an R. */ +@FunctionalInterface public interface Function4<T1, T2, T3, T4, R> extends Serializable { R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java index 3ae6ef4..5efff94 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java @@ -22,6 +22,7 @@ import java.io.Serializable; /** * Base interface for a map function used in Dataset's map function. */ +@FunctionalInterface public interface MapFunction<T, U> extends Serializable { U call(T value) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java index faa59ea..2c3d43a 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java @@ -23,6 +23,7 @@ import java.util.Iterator; /** * Base interface for a map function used in GroupedDataset's mapGroup function. */ +@FunctionalInterface public interface MapGroupsFunction<K, V, R> extends Serializable { R call(K key, Iterator<V> values) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java index cf9945a..68e8557 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java @@ -23,6 +23,7 @@ import java.util.Iterator; /** * Base interface for function used in Dataset's mapPartitions. */ +@FunctionalInterface public interface MapPartitionsFunction<T, U> extends Serializable { Iterator<U> call(Iterator<T> input) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java index 51eed2e..97bd2b3 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java @@ -26,6 +26,7 @@ import scala.Tuple2; * A function that returns zero or more key-value pair records from each input record. The * key-value pairs are represented as scala.Tuple2 objects. */ +@FunctionalInterface public interface PairFlatMapFunction<T, K, V> extends Serializable { Iterator<Tuple2<K, V>> call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java index 2fdfa71..34a7e44 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java @@ -25,6 +25,7 @@ import scala.Tuple2; * A function that returns key-value pairs (Tuple2<K, V>), and can be used to * construct PairRDDs. */ +@FunctionalInterface public interface PairFunction<T, K, V> extends Serializable { Tuple2<K, V> call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java index ee092d0..d9029d8 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java @@ -22,6 +22,7 @@ import java.io.Serializable; /** * Base interface for function used in Dataset's reduce. */ +@FunctionalInterface public interface ReduceFunction<T> extends Serializable { T call(T v1, T v2) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java index f30d42e..aff2bc6 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java @@ -22,6 +22,7 @@ import java.io.Serializable; /** * A function with no return value. */ +@FunctionalInterface public interface VoidFunction<T> extends Serializable { void call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java index da9ae1c..ddb6162 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java @@ -22,6 +22,7 @@ import java.io.Serializable; /** * A two-argument function that takes arguments of type T1 and T2 with no return value. */ +@FunctionalInterface public interface VoidFunction2<T1, T2> extends Serializable { void call(T1 v1, T2 v2) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index dcae4a3..189d607 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -162,14 +162,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at // the end of the task. This is necessary to avoid memory leaks in when the downstream operator // does not fully consume the sorter's output (e.g. sort followed by limit). - taskContext.addTaskCompletionListener( - new TaskCompletionListener() { - @Override - public void onTaskCompletion(TaskContext context) { - cleanupResources(); - } - } - ); + taskContext.addTaskCompletionListener(context -> { cleanupResources(); }); } /** http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java index 01aed95..cf4dfde 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java @@ -27,22 +27,18 @@ final class UnsafeSorterSpillMerger { private final PriorityQueue<UnsafeSorterIterator> priorityQueue; UnsafeSorterSpillMerger( - final RecordComparator recordComparator, - final PrefixComparator prefixComparator, - final int numSpills) { - final Comparator<UnsafeSorterIterator> comparator = new Comparator<UnsafeSorterIterator>() { - - @Override - public int compare(UnsafeSorterIterator left, UnsafeSorterIterator right) { - final int prefixComparisonResult = - prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix()); - if (prefixComparisonResult == 0) { - return recordComparator.compare( - left.getBaseObject(), left.getBaseOffset(), - right.getBaseObject(), right.getBaseOffset()); - } else { - return prefixComparisonResult; - } + RecordComparator recordComparator, + PrefixComparator prefixComparator, + int numSpills) { + Comparator<UnsafeSorterIterator> comparator = (left, right) -> { + int prefixComparisonResult = + prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix()); + if (prefixComparisonResult == 0) { + return recordComparator.compare( + left.getBaseObject(), left.getBaseOffset(), + right.getBaseObject(), right.getBaseOffset()); + } else { + return prefixComparisonResult; } }; priorityQueue = new PriorityQueue<>(numSpills, comparator); http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cbab7b8..7e56406 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -350,9 +350,6 @@ class SparkContext(config: SparkConf) extends Logging { private def warnDeprecatedVersions(): Unit = { val javaVersion = System.getProperty("java.version").split("[+.\\-]+", 3) - if (javaVersion.length >= 2 && javaVersion(1).toInt == 7) { - logWarning("Support for Java 7 is deprecated as of Spark 2.0.0") - } if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.10"))) { logWarning("Support for Scala 2.10 is deprecated as of Spark 2.1.0") } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala index 31b9c5e..3fd812e 100644 --- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala +++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala @@ -39,7 +39,6 @@ private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, comm val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator)) cmd.add(s"-Xmx${memoryMb}M") command.javaOpts.foreach(cmd.add) - CommandBuilderUtils.addPermGenSizeOpt(cmd) addOptionString(cmd, getenv("SPARK_JAVA_OPTS")) cmd } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fe6fe6a..1e6e9a2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1882,20 +1882,17 @@ private[spark] object Utils extends Logging { def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = { // Politely destroy first process.destroy() - - if (waitForProcess(process, timeoutMs)) { + if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) { // Successful exit Option(process.exitValue()) } else { - // Java 8 added a new API which will more forcibly kill the process. Use that if available. try { - classOf[Process].getMethod("destroyForcibly").invoke(process) + process.destroyForcibly() } catch { - case _: NoSuchMethodException => return None // Not available; give up case NonFatal(e) => logWarning("Exception when attempting to kill process", e) } // Wait, again, although this really should return almost immediately - if (waitForProcess(process, timeoutMs)) { + if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) { Option(process.exitValue()) } else { logWarning("Timed out waiting to forcibly kill process") @@ -1905,44 +1902,11 @@ private[spark] object Utils extends Logging { } /** - * Wait for a process to terminate for at most the specified duration. - * - * @return whether the process actually terminated before the given timeout. - */ - def waitForProcess(process: Process, timeoutMs: Long): Boolean = { - try { - // Use Java 8 method if available - classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit]) - .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS) - .asInstanceOf[Boolean] - } catch { - case _: NoSuchMethodException => - // Otherwise implement it manually - var terminated = false - val startTime = System.currentTimeMillis - while (!terminated) { - try { - process.exitValue() - terminated = true - } catch { - case e: IllegalThreadStateException => - // Process not terminated yet - if (System.currentTimeMillis - startTime > timeoutMs) { - return false - } - Thread.sleep(100) - } - } - true - } - } - - /** * Return the stderr of a process after waiting for the process to terminate. * If the process does not terminate within the specified timeout, return None. */ def getStderr(process: Process, timeoutMs: Long): Option[String] = { - val terminated = Utils.waitForProcess(process, timeoutMs) + val terminated = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS) if (terminated) { Some(Source.fromInputStream(process.getErrorStream).getLines().mkString("\n")) } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org