This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 30c9c8dd9fe [SPARK-45919][CORE][SQL] Use Java 16 `record` to simplify Java class definition 30c9c8dd9fe is described below commit 30c9c8dd9fe03eaa85ecf192c977e7645987c653 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed Nov 15 23:59:17 2023 -0800 [SPARK-45919][CORE][SQL] Use Java 16 `record` to simplify Java class definition ### What changes were proposed in this pull request? This pr uses the `record` keyword introduced by [JEP 395](https://openjdk.org/jeps/395) to simplify Java class definition. ### Why are the changes needed? Using the new feature introduced in Java 16 to simplify Java class definition. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43796 from LuciferYang/class-2-record. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../network/client/TransportResponseHandler.java | 6 +++--- .../org/apache/spark/network/crypto/AuthEngine.java | 14 +++++++------- .../org/apache/spark/network/crypto/AuthMessage.java | 12 +----------- .../apache/spark/network/crypto/AuthRpcHandler.java | 10 +++++----- .../apache/spark/network/protocol/StreamChunkId.java | 9 +-------- .../network/server/ChunkFetchRequestHandler.java | 8 ++++---- .../apache/spark/network/RpcIntegrationSuite.java | 13 ++----------- .../apache/spark/network/crypto/AuthEngineSuite.java | 12 ++++++------ .../spark/network/crypto/AuthMessagesSuite.java | 6 +++--- .../shuffle/ExternalShuffleBlockResolver.java | 4 ++-- .../network/shuffle/RemoteBlockPushResolver.java | 2 +- .../spark/network/shuffle/ShuffleIndexRecord.java | 18 +----------------- .../network/shuffle/ShuffleTransportContext.java | 10 +--------- .../shuffle/ShuffleIndexInformationSuite.java | 8 ++++---- .../shuffle/ShuffleTransportContextSuite.java | 2 +- .../network/yarn/YarnShuffleServiceMetrics.java | 20 +------------------- .../connector/expressions/aggregate/Aggregation.java | 15 +++------------ .../datasources/parquet/ParquetReadState.java | 9 +-------- 18 files changed, 47 insertions(+), 131 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index a19767ae201..cf9af2e00c8 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -108,7 +108,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { private void failOutstandingRequests(Throwable cause) { for (Map.Entry<StreamChunkId, ChunkReceivedCallback> entry : outstandingFetches.entrySet()) { try { - entry.getValue().onFailure(entry.getKey().chunkIndex, cause); + entry.getValue().onFailure(entry.getKey().chunkIndex(), cause); } catch (Exception e) { logger.warn("ChunkReceivedCallback.onFailure throws exception", e); } @@ -169,7 +169,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { resp.body().release(); } else { outstandingFetches.remove(resp.streamChunkId); - listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body()); + listener.onSuccess(resp.streamChunkId.chunkIndex(), resp.body()); resp.body().release(); } } else if (message instanceof ChunkFetchFailure) { @@ -180,7 +180,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { resp.streamChunkId, getRemoteAddress(channel), resp.errorString); } else { outstandingFetches.remove(resp.streamChunkId); - listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException( + listener.onFailure(resp.streamChunkId.chunkIndex(), new ChunkFetchFailureException( "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString)); } } else if (message instanceof RpcResponse) { diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java index 078d9ceb317..7ca4bc40a86 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java @@ -118,20 +118,20 @@ class AuthEngine implements Closeable { private byte[] decryptEphemeralPublicKey( AuthMessage encryptedPublicKey, byte[] transcript) throws GeneralSecurityException { - Preconditions.checkArgument(appId.equals(encryptedPublicKey.appId)); + Preconditions.checkArgument(appId.equals(encryptedPublicKey.appId())); // Mix in the app ID, salt, and transcript into HKDF and use it as AES-GCM AAD - byte[] aadState = Bytes.concat(appId.getBytes(UTF_8), encryptedPublicKey.salt, transcript); + byte[] aadState = Bytes.concat(appId.getBytes(UTF_8), encryptedPublicKey.salt(), transcript); // Use HKDF to derive an AES_GCM key from the pre-shared key, non-secret salt, and AAD state byte[] derivedKeyEncryptingKey = Hkdf.computeHkdf( MAC_ALGORITHM, preSharedSecret, - encryptedPublicKey.salt, + encryptedPublicKey.salt(), aadState, AES_GCM_KEY_SIZE_BYTES); // If the AES-GCM payload is modified at all or if the AAD state does not match, decryption // will throw a GeneralSecurityException. return new AesGcmJce(derivedKeyEncryptingKey) - .decrypt(encryptedPublicKey.ciphertext, aadState); + .decrypt(encryptedPublicKey.ciphertext(), aadState); } /** @@ -154,7 +154,7 @@ class AuthEngine implements Closeable { * @return An encrypted server ephemeral public key to be sent to the client. */ AuthMessage response(AuthMessage encryptedClientPublicKey) throws GeneralSecurityException { - Preconditions.checkArgument(appId.equals(encryptedClientPublicKey.appId)); + Preconditions.checkArgument(appId.equals(encryptedClientPublicKey.appId())); // Compute a shared secret given the client public key and the server private key byte[] clientPublicKey = decryptEphemeralPublicKey(encryptedClientPublicKey, EMPTY_TRANSCRIPT); @@ -182,8 +182,8 @@ class AuthEngine implements Closeable { */ void deriveSessionCipher(AuthMessage encryptedClientPublicKey, AuthMessage encryptedServerPublicKey) throws GeneralSecurityException { - Preconditions.checkArgument(appId.equals(encryptedClientPublicKey.appId)); - Preconditions.checkArgument(appId.equals(encryptedServerPublicKey.appId)); + Preconditions.checkArgument(appId.equals(encryptedClientPublicKey.appId())); + Preconditions.checkArgument(appId.equals(encryptedServerPublicKey.appId())); // Compute a shared secret given the server public key and the client private key, // mixing in the protocol transcript. byte[] serverPublicKey = decryptEphemeralPublicKey( diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthMessage.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthMessage.java index 76690cbc4c2..7309e05c904 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthMessage.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthMessage.java @@ -30,20 +30,10 @@ import org.apache.spark.network.protocol.Encoders; * * Please see crypto/README.md for more details of implementation. */ -class AuthMessage implements Encodable { +record AuthMessage(String appId, byte[] salt, byte[] ciphertext) implements Encodable { /** Serialization tag used to catch incorrect payloads. */ private static final byte TAG_BYTE = (byte) 0xFB; - public final String appId; - public final byte[] salt; - public final byte[] ciphertext; - - AuthMessage(String appId, byte[] salt, byte[] ciphertext) { - this.appId = appId; - this.salt = salt; - this.ciphertext = ciphertext; - } - @Override public int encodedLength() { return 1 + diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java index 134fd3d842a..9a7ce8b7b31 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java @@ -108,17 +108,17 @@ class AuthRpcHandler extends AbstractAuthRpcHandler { // Here we have the client challenge, so perform the new auth protocol and set up the channel. AuthEngine engine = null; try { - String secret = secretKeyHolder.getSecretKey(challenge.appId); + String secret = secretKeyHolder.getSecretKey(challenge.appId()); Preconditions.checkState(secret != null, - "Trying to authenticate non-registered app %s.", challenge.appId); - LOG.debug("Authenticating challenge for app {}.", challenge.appId); - engine = new AuthEngine(challenge.appId, secret, conf); + "Trying to authenticate non-registered app %s.", challenge.appId()); + LOG.debug("Authenticating challenge for app {}.", challenge.appId()); + engine = new AuthEngine(challenge.appId(), secret, conf); AuthMessage response = engine.response(challenge); ByteBuf responseData = Unpooled.buffer(response.encodedLength()); response.encode(responseData); callback.onSuccess(responseData.nioBuffer()); engine.sessionCipher().addToChannel(channel); - client.setClientId(challenge.appId); + client.setClientId(challenge.appId()); } catch (Exception e) { // This is a fatal error: authentication has failed. Close the channel explicitly. LOG.debug("Authentication failed for client {}, closing channel.", channel.remoteAddress()); diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamChunkId.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamChunkId.java index ae795ca4d14..c3b715009df 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamChunkId.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/StreamChunkId.java @@ -26,14 +26,7 @@ import org.apache.commons.lang3.builder.ToStringStyle; /** * Encapsulates a request for a particular chunk of a stream. */ -public final class StreamChunkId implements Encodable { - public final long streamId; - public final int chunkIndex; - - public StreamChunkId(long streamId, int chunkIndex) { - this.streamId = streamId; - this.chunkIndex = chunkIndex; - } +public record StreamChunkId(long streamId, int chunkIndex) implements Encodable { @Override public int encodedLength() { diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index 9a71cf593e2..e49141c7b96 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -99,8 +99,8 @@ public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkF } ManagedBuffer buf; try { - streamManager.checkAuthorization(client, msg.streamChunkId.streamId); - buf = streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex); + streamManager.checkAuthorization(client, msg.streamChunkId.streamId()); + buf = streamManager.getChunk(msg.streamChunkId.streamId(), msg.streamChunkId.chunkIndex()); if (buf == null) { throw new IllegalStateException("Chunk was not found"); } @@ -112,9 +112,9 @@ public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkF return; } - streamManager.chunkBeingSent(msg.streamChunkId.streamId); + streamManager.chunkBeingSent(msg.streamChunkId.streamId()); respond(channel, new ChunkFetchSuccess(msg.streamChunkId, buf)).addListener( - (ChannelFutureListener) future -> streamManager.chunkSent(msg.streamChunkId.streamId)); + (ChannelFutureListener) future -> streamManager.chunkSent(msg.streamChunkId.streamId())); } /** diff --git a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java index a7a61588ef1..55a0cc73f8b 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java @@ -234,17 +234,8 @@ public class RpcIntegrationSuite { return res; } - private static class RpcStreamCallback implements RpcResponseCallback { - final String streamId; - final RpcResult res; - final Semaphore sem; - - RpcStreamCallback(String streamId, RpcResult res, Semaphore sem) { - this.streamId = streamId; - this.res = res; - this.sem = sem; - } - + private record RpcStreamCallback( + String streamId, RpcResult res, Semaphore sem) implements RpcResponseCallback { @Override public void onSuccess(ByteBuffer message) { res.successMessages.add(streamId); diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java index d297e96aac8..26b65e79ff6 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java @@ -84,7 +84,7 @@ public class AuthEngineSuite { AuthEngine server = new AuthEngine("appId", "secret", conf)) { AuthMessage clientChallenge = client.challenge(); AuthMessage corruptChallenge = - new AuthMessage("junk", clientChallenge.salt, clientChallenge.ciphertext); + new AuthMessage("junk", clientChallenge.salt(), clientChallenge.ciphertext()); assertThrows(IllegalArgumentException.class, () -> server.response(corruptChallenge)); } } @@ -95,7 +95,7 @@ public class AuthEngineSuite { try (AuthEngine client = new AuthEngine("appId", "secret", conf); AuthEngine server = new AuthEngine("appId", "secret", conf)) { AuthMessage clientChallenge = client.challenge(); - clientChallenge.salt[0] ^= 1; + clientChallenge.salt()[0] ^= 1; assertThrows(GeneralSecurityException.class, () -> server.response(clientChallenge)); } } @@ -106,7 +106,7 @@ public class AuthEngineSuite { try (AuthEngine client = new AuthEngine("appId", "secret", conf); AuthEngine server = new AuthEngine("appId", "secret", conf)) { AuthMessage clientChallenge = client.challenge(); - clientChallenge.ciphertext[0] ^= 1; + clientChallenge.ciphertext()[0] ^= 1; assertThrows(GeneralSecurityException.class, () -> server.response(clientChallenge)); } } @@ -119,7 +119,7 @@ public class AuthEngineSuite { AuthMessage clientChallenge = client.challenge(); AuthMessage serverResponse = server.response(clientChallenge); AuthMessage corruptResponse = - new AuthMessage("junk", serverResponse.salt, serverResponse.ciphertext); + new AuthMessage("junk", serverResponse.salt(), serverResponse.ciphertext()); assertThrows(IllegalArgumentException.class, () -> client.deriveSessionCipher(clientChallenge, corruptResponse)); } @@ -132,7 +132,7 @@ public class AuthEngineSuite { AuthEngine server = new AuthEngine("appId", "secret", conf)) { AuthMessage clientChallenge = client.challenge(); AuthMessage serverResponse = server.response(clientChallenge); - serverResponse.salt[0] ^= 1; + serverResponse.salt()[0] ^= 1; assertThrows(GeneralSecurityException.class, () -> client.deriveSessionCipher(clientChallenge, serverResponse)); } @@ -145,7 +145,7 @@ public class AuthEngineSuite { AuthEngine server = new AuthEngine("appId", "secret", conf)) { AuthMessage clientChallenge = client.challenge(); AuthMessage serverResponse = server.response(clientChallenge); - serverResponse.ciphertext[0] ^= 1; + serverResponse.ciphertext()[0] ^= 1; assertThrows(GeneralSecurityException.class, () -> client.deriveSessionCipher(clientChallenge, serverResponse)); } diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java index 8a8ec5bc4c6..15064fb19ba 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthMessagesSuite.java @@ -46,8 +46,8 @@ public class AuthMessagesSuite { msg.encode(buf); AuthMessage decoded = AuthMessage.decodeMessage(buf.nioBuffer()); - assertEquals(msg.appId, decoded.appId); - assertArrayEquals(msg.salt, decoded.salt); - assertArrayEquals(msg.ciphertext, decoded.ciphertext); + assertEquals(msg.appId(), decoded.appId()); + assertArrayEquals(msg.salt(), decoded.salt()); + assertArrayEquals(msg.ciphertext(), decoded.ciphertext()); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index ea84e8eb59d..429e5f03b9e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -325,8 +325,8 @@ public class ExternalShuffleBlockResolver { executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data")), - shuffleIndexRecord.getOffset(), - shuffleIndexRecord.getLength()); + shuffleIndexRecord.offset(), + shuffleIndexRecord.length()); } catch (ExecutionException e) { throw new RuntimeException("Failed to open file: " + indexFilePath, e); } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 14fefebe089..2f8b5b99746 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -372,7 +372,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { ShuffleIndexInformation shuffleIndexInformation = indexCache.get(indexFilePath); ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId); return new FileSegmentManagedBuffer( - conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength()); + conf, dataFile, shuffleIndexRecord.offset(), shuffleIndexRecord.length()); } catch (ExecutionException e) { throw new RuntimeException(String.format( "Failed to open merged shuffle index file %s", indexFilePath), e); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java index 6a4fac150a6..066a1de8316 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexRecord.java @@ -20,21 +20,5 @@ package org.apache.spark.network.shuffle; /** * Contains offset and length of the shuffle block data. */ -public class ShuffleIndexRecord { - private final long offset; - private final long length; - - public ShuffleIndexRecord(long offset, long length) { - this.offset = offset; - this.length = length; - } - - public long getOffset() { - return offset; - } - - public long getLength() { - return length; - } +public record ShuffleIndexRecord(long offset, long length) { } - diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java index feaaa570b73..a0794113a08 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleTransportContext.java @@ -154,15 +154,7 @@ public class ShuffleTransportContext extends TransportContext { * accepted by {@link FinalizedHandler} instead, which is configured to execute in a separate * EventLoopGroup. */ - static class RpcRequestInternal { - public final BlockTransferMessage.Type messageType; - public final RpcRequest rpcRequest; - - RpcRequestInternal(BlockTransferMessage.Type messageType, - RpcRequest rpcRequest) { - this.messageType = messageType; - this.rpcRequest = rpcRequest; - } + record RpcRequestInternal(BlockTransferMessage.Type messageType, RpcRequest rpcRequest) { } static class FinalizedHandler extends SimpleChannelInboundHandler<RpcRequestInternal> { diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleIndexInformationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleIndexInformationSuite.java index 30835f19753..aff6e88b90a 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleIndexInformationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleIndexInformationSuite.java @@ -59,11 +59,11 @@ public class ShuffleIndexInformationSuite { ShuffleIndexInformation s = new ShuffleIndexInformation(path); // the index file contains 3 offsets: // 0, sortBlock0.length, sortBlock0.length + sortBlock1.length - assertEquals(0L, s.getIndex(0).getOffset()); - assertEquals(sortBlock0.length(), s.getIndex(0).getLength()); + assertEquals(0L, s.getIndex(0).offset()); + assertEquals(sortBlock0.length(), s.getIndex(0).length()); - assertEquals(sortBlock0.length(), s.getIndex(1).getOffset()); - assertEquals(sortBlock1.length(), s.getIndex(1).getLength()); + assertEquals(sortBlock0.length(), s.getIndex(1).offset()); + assertEquals(sortBlock1.length(), s.getIndex(1).length()); assertEquals((3 * 8) + ShuffleIndexInformation.INSTANCE_MEMORY_FOOTPRINT, s.getRetainedMemorySize()); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java index de164474766..6b93044f71b 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java @@ -123,7 +123,7 @@ public class ShuffleTransportContextSuite { Assertions.assertEquals(1, out.size()); Assertions.assertTrue(out.get(0) instanceof ShuffleTransportContext.RpcRequestInternal); Assertions.assertEquals(BlockTransferMessage.Type.FINALIZE_SHUFFLE_MERGE, - ((ShuffleTransportContext.RpcRequestInternal) out.get(0)).messageType); + ((ShuffleTransportContext.RpcRequestInternal) out.get(0)).messageType()); } @Test diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 0eb0c10df4c..21591d9f2f6 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -153,24 +153,6 @@ class YarnShuffleServiceMetrics implements MetricsSource { valueName + " value of " + baseName); } - private static class ShuffleServiceMetricsInfo implements MetricsInfo { - - private final String name; - private final String description; - - ShuffleServiceMetricsInfo(String name, String description) { - this.name = name; - this.description = description; - } - - @Override - public String name() { - return name; - } - - @Override - public String description() { - return description; - } + private record ShuffleServiceMetricsInfo(String name, String description) implements MetricsInfo { } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Aggregation.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Aggregation.java index 11d9e475ca1..3a1c99e0cb3 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Aggregation.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/aggregate/Aggregation.java @@ -28,16 +28,7 @@ import org.apache.spark.sql.connector.expressions.Expression; * @since 3.2.0 */ @Evolving -public final class Aggregation implements Serializable { - private final AggregateFunc[] aggregateExpressions; - private final Expression[] groupByExpressions; - - public Aggregation(AggregateFunc[] aggregateExpressions, Expression[] groupByExpressions) { - this.aggregateExpressions = aggregateExpressions; - this.groupByExpressions = groupByExpressions; - } - - public AggregateFunc[] aggregateExpressions() { return aggregateExpressions; } - - public Expression[] groupByExpressions() { return groupByExpressions; } +public record Aggregation( + AggregateFunc[] aggregateExpressions, + Expression[] groupByExpressions) implements Serializable { } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java index bde69402241..7a47d350af6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java @@ -177,13 +177,6 @@ final class ParquetReadState { /** * Helper struct to represent a range of row indexes `[start, end]`. */ - private static class RowRange { - final long start; - final long end; - - RowRange(long start, long end) { - this.start = start; - this.end = end; - } + private record RowRange(long start, long end) { } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org