arunpandianp commented on code in PR #35786: URL: https://github.com/apache/beam/pull/35786#discussion_r2259631842
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -394,6 +370,163 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private FanOutStreamingEngineWorkerHarness createFanOutStreamingEngineWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + ChannelCache channelCache, + GrpcDispatcherClient dispatcherClient) { + + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + channelCache = channelCache == null ? createChannelCache(options, configFetcher) : channelCache; + + return FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, serializedWorkItemSize, watermarks, processingContext, getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + } + + private StreamingWorkerHarness createSingleSourceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + GetDataClient getDataClient = + new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + HeartbeatSender heartbeatSender = + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + WorkCommitter workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + GetWorkSender getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); + + this.getDataStatusProvider = getDataClient::printHtml; + this.currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + + return SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build(); + } + + private void switchStreamingWorkerHarness( + ConnectivityType connectivityType, + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + ChannelCache channelCache, + GrpcDispatcherClient dispatcherClient, + WindmillServerStub windmillServer) { + if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) { + if (!(this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness)) { + LOG.debug("Shutting down to SingleSourceWorkerHarness"); + this.streamingWorkerHarness.get().shutdown(); + FanOutStreamingEngineWorkerHarness fanoutStreamingWorkerHarness = + createFanOutStreamingEngineWorkerHarness( + clientId, + options, + windmillStreamFactory, + streamingWorkScheduler, + getDataMetricTracker, + memoryMonitor, + channelCache, + dispatcherClient); + this.streamingWorkerHarness.set(fanoutStreamingWorkerHarness); + this.getDataStatusProvider = getDataMetricTracker::printHtml; + this.currentActiveCommitBytesProvider = + fanoutStreamingWorkerHarness::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet(options, fanoutStreamingWorkerHarness::currentWindmillEndpoints); + streamingWorkerHarness.get().start(); Review Comment: will this block? Blocking here will block the config callback thread and future config callbacks. The shutdown seems like it will block, can we move this callback to a separate single threaded executor? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -394,6 +370,163 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private FanOutStreamingEngineWorkerHarness createFanOutStreamingEngineWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + ChannelCache channelCache, + GrpcDispatcherClient dispatcherClient) { + + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + channelCache = channelCache == null ? createChannelCache(options, configFetcher) : channelCache; + + return FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, serializedWorkItemSize, watermarks, processingContext, getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + } + + private StreamingWorkerHarness createSingleSourceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + GetDataClient getDataClient = + new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + HeartbeatSender heartbeatSender = + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + WorkCommitter workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + GetWorkSender getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); + + this.getDataStatusProvider = getDataClient::printHtml; + this.currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + + return SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build(); + } + + private void switchStreamingWorkerHarness( + ConnectivityType connectivityType, + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + ChannelCache channelCache, + GrpcDispatcherClient dispatcherClient, + WindmillServerStub windmillServer) { + if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) { + if (!(this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness)) { + LOG.debug("Shutting down to SingleSourceWorkerHarness"); + this.streamingWorkerHarness.get().shutdown(); + FanOutStreamingEngineWorkerHarness fanoutStreamingWorkerHarness = + createFanOutStreamingEngineWorkerHarness( + clientId, + options, + windmillStreamFactory, + streamingWorkScheduler, + getDataMetricTracker, + memoryMonitor, + channelCache, + dispatcherClient); + this.streamingWorkerHarness.set(fanoutStreamingWorkerHarness); + this.getDataStatusProvider = getDataMetricTracker::printHtml; Review Comment: resetting these here won't update the references inside `statusPages`. Can we update the references in statusPages too? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -394,6 +370,163 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private FanOutStreamingEngineWorkerHarness createFanOutStreamingEngineWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + ChannelCache channelCache, + GrpcDispatcherClient dispatcherClient) { + + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + channelCache = channelCache == null ? createChannelCache(options, configFetcher) : channelCache; + + return FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, serializedWorkItemSize, watermarks, processingContext, getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + } + + private StreamingWorkerHarness createSingleSourceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + GetDataClient getDataClient = + new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + HeartbeatSender heartbeatSender = + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + WorkCommitter workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + GetWorkSender getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); + + this.getDataStatusProvider = getDataClient::printHtml; + this.currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + + return SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build(); + } + + private void switchStreamingWorkerHarness( + ConnectivityType connectivityType, + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + ChannelCache channelCache, + GrpcDispatcherClient dispatcherClient, + WindmillServerStub windmillServer) { + if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) { + if (!(this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness)) { + LOG.debug("Shutting down to SingleSourceWorkerHarness"); Review Comment: Add a info log saying switching from connectivityType X to Y. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java: ########## @@ -124,6 +124,10 @@ public static StreamingWorkerStatusPages.Builder builder() { new ThreadFactoryBuilder().setNameFormat(DUMP_STATUS_PAGES_EXECUTOR).build())); } + public void updateChannelCache(@Nullable ChannelCache channelCache) { Review Comment: unused? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org