This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch feature/GEODE-3604 in repository https://gitbox.apache.org/repos/asf/geode.git
commit f75db762f187de0044da3b8ca326ec0fa3bbc8f4 Author: kohlmu-pivotal <[email protected]> AuthorDate: Mon Sep 11 10:00:20 2017 -0700 Change stream processing to avoid intermediary collections and process in-line --- .../operations/GetAllRequestOperationHandler.java | 19 ++++++++--------- .../GetAvailableServersOperationHandler.java | 24 +++++++++++----------- .../operations/PutAllRequestOperationHandler.java | 7 +++---- 3 files changed, 24 insertions(+), 26 deletions(-) diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java index 446dbcb..41ad65f 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java @@ -53,18 +53,17 @@ public class GetAllRequestOperationHandler .makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found")); } - Map<Boolean, List<Object>> resultsCollection = request.getKeyList().stream() - .map((key) -> processOneMessage(serializationService, region, key)) - .collect(Collectors.partitioningBy(x -> x instanceof BasicTypes.Entry)); RegionAPI.GetAllResponse.Builder responseBuilder = RegionAPI.GetAllResponse.newBuilder(); - for (Object entry : resultsCollection.get(true)) { - responseBuilder.addEntries((BasicTypes.Entry) entry); - } - - for (Object entry : resultsCollection.get(false)) { - responseBuilder.addFailures((BasicTypes.KeyedError) entry); - } + request.getKeyList().stream() + .map((key) -> processOneMessage(serializationService, region, key)) + .forEach(entry -> { + if (entry instanceof BasicTypes.Entry) { + responseBuilder.addEntries((BasicTypes.Entry) entry); + }else if (entry instanceof BasicTypes.KeyedError) { + responseBuilder.addFailures((BasicTypes.KeyedError) entry); + } + }); return Success.of(responseBuilder.build()); } diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java index a242492..6d63b95 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java @@ -14,19 +14,18 @@ */ package org.apache.geode.protocol.protobuf.operations; -import java.util.ArrayList; -import java.util.Collection; -import java.util.stream.Collectors; +import java.util.Collections; +import java.util.List; import org.apache.geode.annotations.Experimental; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.distributed.internal.ServerLocation; import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext; import org.apache.geode.internal.exception.InvalidExecutionContextException; -import org.apache.geode.protocol.operations.OperationHandler; import org.apache.geode.internal.protocol.protobuf.BasicTypes; -import org.apache.geode.protocol.protobuf.Result; import org.apache.geode.internal.protocol.protobuf.ServerAPI; +import org.apache.geode.protocol.operations.OperationHandler; +import org.apache.geode.protocol.protobuf.Result; import org.apache.geode.protocol.protobuf.Success; import org.apache.geode.serialization.SerializationService; @@ -40,18 +39,19 @@ public class GetAvailableServersOperationHandler implements MessageExecutionContext executionContext) throws InvalidExecutionContextException { InternalLocator internalLocator = (InternalLocator) executionContext.getLocator(); - ArrayList serversFromSnapshot = + List serversFromSnapshot = internalLocator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null); if (serversFromSnapshot == null) { - serversFromSnapshot = new ArrayList(); + serversFromSnapshot = Collections.EMPTY_LIST; } - Collection<BasicTypes.Server> servers = (Collection<BasicTypes.Server>) serversFromSnapshot + ServerAPI.GetAvailableServersResponse.Builder serverResponseBuilder = + ServerAPI.GetAvailableServersResponse.newBuilder(); + + serversFromSnapshot .stream().map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation)) - .collect(Collectors.toList()); - ServerAPI.GetAvailableServersResponse.Builder builder = - ServerAPI.GetAvailableServersResponse.newBuilder().addAllServers(servers); - return Success.of(builder.build()); + .forEach( serverMessage -> serverResponseBuilder.addServers((BasicTypes.Server) serverMessage)); + return Success.of(serverResponseBuilder.build()); } private BasicTypes.Server getServerProtobufMessage(ServerLocation serverLocation) { diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java index 8f2d9ef..b3ca344 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java @@ -54,10 +54,9 @@ public class PutAllRequestOperationHandler "Region passed by client did not exist: " + putAllRequest.getRegionName(), logger, null)); } - RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder() - .addAllFailedKeys(putAllRequest.getEntryList().stream() - .map((entry) -> singlePut(serializationService, region, entry)).filter(Objects::nonNull) - .collect(Collectors.toList())); + RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder(); + putAllRequest.getEntryList().stream() + .map((entry) -> singlePut(serializationService, region, entry)).filter(Objects::nonNull).forEach(failedKey -> builder.addFailedKeys(failedKey)); return Success.of(builder.build()); } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
