[ https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165143&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165143 ]
ASF GitHub Bot logged work on BEAM-5850: ---------------------------------------- Author: ASF GitHub Bot Created on: 12/Nov/18 21:10 Start Date: 12/Nov/18 21:10 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #6786: [BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on the same thread to support metrics. URL: https://github.com/apache/beam/pull/6786#discussion_r232812669 ########## File path: sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java ########## @@ -123,43 +141,142 @@ public void close() { * it is ready to consume that data. */ private final class InboundObserver implements StreamObserver<BeamFnApi.Elements> { + + private final ConcurrentHashMap<String, SynchronousQueue<Data>> instructionIdToQueue; + private final ConcurrentHashMap<String, HashSet<Consumer<Data>>> instructionIdToConsumers; + + public InboundObserver(boolean enableQueuing) { + this.instructionIdToQueue = + enableQueuing ? new ConcurrentHashMap<String, SynchronousQueue<Data>>() : null; + this.instructionIdToConsumers = new ConcurrentHashMap<String, HashSet<Consumer<Data>>>(); + } + + private void registerConsumer( + String instructionId, Consumer<BeamFnApi.Elements.Data> consumer) { + this.instructionIdToConsumers.putIfAbsent(instructionId, new HashSet<Consumer<Data>>()); + this.instructionIdToConsumers.get(instructionId).add(consumer); + } + + private boolean isFinalizationData(BeamFnApi.Elements.Data data) { + return data.getData().isEmpty(); + } + + // TODO overall error handling needs an audit and possibly we need to be creative about + // testing here. Maybe handleData should just throw all exceptions, and catch them + // and call onError in the calling code? + private void handleData(BeamFnApi.Elements.Data data) { + try { + LogicalEndpoint key = LogicalEndpoint.of(data.getInstructionReference(), data.getTarget()); + CompletableFuture<Consumer<BeamFnApi.Elements.Data>> consumer = receiverFuture(key); + if (!consumer.isDone()) { + LOG.debug( + "Received data for key {} without consumer ready. " + + "Waiting for consumer to be registered.", + key); + } + consumer.get().accept(data); + if (isFinalizationData(data)) { + consumers.remove(key); + } + /* + * TODO: On failure we should fail any bundles that were impacted eagerly + * instead of relying on the Runner harness to do all the failure handling. + */ + } catch (ExecutionException | InterruptedException e) { + LOG.error( + "Client interrupted during handling of data for instruction {} and target {}", + data.getInstructionReference(), + data.getTarget(), + e); + outboundObserver.onError(e); + } catch (RuntimeException e) { + LOG.error( + "Client failed to handle data for instruction {} and target {}", + data.getInstructionReference(), + data.getTarget(), + e); + outboundObserver.onError(e); + } + } + + private boolean IsQueueingEnabled() { + return instructionIdToQueue != null; + } + + + // TODO please help reveiw the concurrency aspect here. Questions: + // - Safe use of ConcurrentHashMap? + // - Unnecessary use of ConcurrentHashMap? + // - Is there some code that can run concurrently now, which was not before? @Override public void onNext(BeamFnApi.Elements value) { for (BeamFnApi.Elements.Data data : value.getDataList()) { try { - LogicalEndpoint key = - LogicalEndpoint.of(data.getInstructionReference(), data.getTarget()); - CompletableFuture<Consumer<BeamFnApi.Elements.Data>> consumer = receiverFuture(key); - if (!consumer.isDone()) { - LOG.debug( - "Received data for key {} without consumer ready. " - + "Waiting for consumer to be registered.", - key); - } - consumer.get().accept(data); - if (data.getData().isEmpty()) { - consumers.remove(key); + if (IsQueueingEnabled()) { + // TODO Is there a cleaner way do to this? + // I was trying to avoid always doing multiple lookups, and always creating + // a new instance on the heap + SynchronousQueue<Data> queue = instructionIdToQueue.get(data.getInstructionReference()); + if (queue == null) { + instructionIdToQueue.putIfAbsent( + data.getInstructionReference(), new SynchronousQueue<Data>()); + // Lookup the queue again incase there was a race and another thread added one. + queue = instructionIdToQueue.get(data.getInstructionReference()); + } + + // TODO, how do we make this throw an interrupted exception? And when should we + // do this? + // TODO is there any concurrency issue here? Can multiple threads call onNext? Review comment: ack ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 165143) Time Spent: 4h 10m (was: 4h) > Make process, finish and start run on the same thread to support metrics. > ------------------------------------------------------------------------- > > Key: BEAM-5850 > URL: https://issues.apache.org/jira/browse/BEAM-5850 > Project: Beam > Issue Type: New Feature > Components: java-fn-execution > Reporter: Alex Amato > Assignee: Alex Amato > Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Update BeamFnDataReceiver to place elements into a Queue and consumer then > and call the element processing receiver in blockTillReadFinishes -- This message was sent by Atlassian JIRA (v7.6.3#76005)