arunpandianp commented on code in PR #35786:
URL: https://github.com/apache/beam/pull/35786#discussion_r2259631842


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +370,163 @@ private StreamingDataflowWorker(
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private FanOutStreamingEngineWorkerHarness 
createFanOutStreamingEngineWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      ChannelCache channelCache,
+      GrpcDispatcherClient dispatcherClient) {
+
+    WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+    channelCache = channelCache == null ? createChannelCache(options, 
configFetcher) : channelCache;
+
+    return FanOutStreamingEngineWorkerHarness.create(
+        createJobHeader(options, clientId),
+        GetWorkBudget.builder()
+            .setItems(chooseMaxBundlesOutstanding(options))
+            .setBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build(),
+        windmillStreamFactory,
+        (workItem, serializedWorkItemSize, watermarks, processingContext, 
getWorkStreamLatencies) ->
+            computationStateCache
+                .get(processingContext.computationId())
+                .ifPresent(
+                    computationState -> {
+                      memoryMonitor.waitForResources("GetWork");
+                      streamingWorkScheduler.scheduleWork(
+                          computationState,
+                          workItem,
+                          serializedWorkItemSize,
+                          watermarks,
+                          processingContext,
+                          getWorkStreamLatencies);
+                    }),
+        ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
+        GetWorkBudgetDistributors.distributeEvenly(),
+        Preconditions.checkNotNull(dispatcherClient),
+        commitWorkStream ->
+            StreamingEngineWorkCommitter.builder()
+                // Share the commitByteSemaphore across all created 
workCommitters.
+                .setCommitByteSemaphore(maxCommitByteSemaphore)
+                .setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                .setOnCommitComplete(this::onCompleteCommit)
+                
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                .setCommitWorkStreamFactory(
+                    () -> CloseableStream.create(commitWorkStream, () -> {}))
+                .build(),
+        getDataMetricTracker);
+  }
+
+  private StreamingWorkerHarness createSingleSourceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+    WindmillStreamPool<GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+    GetDataClient getDataClient =
+        new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+    HeartbeatSender heartbeatSender =
+        createStreamingEngineHeartbeatSender(
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+    WorkCommitter workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            .setCommitWorkStreamFactory(
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
+            .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+            .setNumCommitSenders(numCommitThreads)
+            .setOnCommitComplete(this::onCompleteCommit)
+            .build();
+    GetWorkSender getWorkSender =
+        GetWorkSender.forStreamingEngine(
+            receiver -> windmillServer.getWorkStream(request, receiver));
+
+    this.getDataStatusProvider = getDataClient::printHtml;
+    this.currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+
+    return SingleSourceWorkerHarness.builder()
+        .setStreamingWorkScheduler(streamingWorkScheduler)
+        .setWorkCommitter(workCommitter)
+        .setGetDataClient(getDataClient)
+        .setComputationStateFetcher(this.computationStateCache::get)
+        .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
+        .setHeartbeatSender(heartbeatSender)
+        .setGetWorkSender(getWorkSender)
+        .build();
+  }
+
+  private void switchStreamingWorkerHarness(
+      ConnectivityType connectivityType,
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      ChannelCache channelCache,
+      GrpcDispatcherClient dispatcherClient,
+      WindmillServerStub windmillServer) {
+    if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+      if (!(this.streamingWorkerHarness.get() instanceof 
FanOutStreamingEngineWorkerHarness)) {
+        LOG.debug("Shutting down to SingleSourceWorkerHarness");
+        this.streamingWorkerHarness.get().shutdown();
+        FanOutStreamingEngineWorkerHarness fanoutStreamingWorkerHarness =
+            createFanOutStreamingEngineWorkerHarness(
+                clientId,
+                options,
+                windmillStreamFactory,
+                streamingWorkScheduler,
+                getDataMetricTracker,
+                memoryMonitor,
+                channelCache,
+                dispatcherClient);
+        this.streamingWorkerHarness.set(fanoutStreamingWorkerHarness);
+        this.getDataStatusProvider = getDataMetricTracker::printHtml;
+        this.currentActiveCommitBytesProvider =
+            fanoutStreamingWorkerHarness::currentActiveCommitBytes;
+        this.channelzServlet =
+            createChannelzServlet(options, 
fanoutStreamingWorkerHarness::currentWindmillEndpoints);
+        streamingWorkerHarness.get().start();

Review Comment:
   will this block? Blocking here will block the config callback thread and 
future config callbacks.
   
   The shutdown seems like it will block, can we move this callback to a 
separate single threaded executor?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +370,163 @@ private StreamingDataflowWorker(
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private FanOutStreamingEngineWorkerHarness 
createFanOutStreamingEngineWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      ChannelCache channelCache,
+      GrpcDispatcherClient dispatcherClient) {
+
+    WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+    channelCache = channelCache == null ? createChannelCache(options, 
configFetcher) : channelCache;
+
+    return FanOutStreamingEngineWorkerHarness.create(
+        createJobHeader(options, clientId),
+        GetWorkBudget.builder()
+            .setItems(chooseMaxBundlesOutstanding(options))
+            .setBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build(),
+        windmillStreamFactory,
+        (workItem, serializedWorkItemSize, watermarks, processingContext, 
getWorkStreamLatencies) ->
+            computationStateCache
+                .get(processingContext.computationId())
+                .ifPresent(
+                    computationState -> {
+                      memoryMonitor.waitForResources("GetWork");
+                      streamingWorkScheduler.scheduleWork(
+                          computationState,
+                          workItem,
+                          serializedWorkItemSize,
+                          watermarks,
+                          processingContext,
+                          getWorkStreamLatencies);
+                    }),
+        ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
+        GetWorkBudgetDistributors.distributeEvenly(),
+        Preconditions.checkNotNull(dispatcherClient),
+        commitWorkStream ->
+            StreamingEngineWorkCommitter.builder()
+                // Share the commitByteSemaphore across all created 
workCommitters.
+                .setCommitByteSemaphore(maxCommitByteSemaphore)
+                .setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                .setOnCommitComplete(this::onCompleteCommit)
+                
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                .setCommitWorkStreamFactory(
+                    () -> CloseableStream.create(commitWorkStream, () -> {}))
+                .build(),
+        getDataMetricTracker);
+  }
+
+  private StreamingWorkerHarness createSingleSourceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+    WindmillStreamPool<GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+    GetDataClient getDataClient =
+        new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+    HeartbeatSender heartbeatSender =
+        createStreamingEngineHeartbeatSender(
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+    WorkCommitter workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            .setCommitWorkStreamFactory(
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
+            .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+            .setNumCommitSenders(numCommitThreads)
+            .setOnCommitComplete(this::onCompleteCommit)
+            .build();
+    GetWorkSender getWorkSender =
+        GetWorkSender.forStreamingEngine(
+            receiver -> windmillServer.getWorkStream(request, receiver));
+
+    this.getDataStatusProvider = getDataClient::printHtml;
+    this.currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+
+    return SingleSourceWorkerHarness.builder()
+        .setStreamingWorkScheduler(streamingWorkScheduler)
+        .setWorkCommitter(workCommitter)
+        .setGetDataClient(getDataClient)
+        .setComputationStateFetcher(this.computationStateCache::get)
+        .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
+        .setHeartbeatSender(heartbeatSender)
+        .setGetWorkSender(getWorkSender)
+        .build();
+  }
+
+  private void switchStreamingWorkerHarness(
+      ConnectivityType connectivityType,
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      ChannelCache channelCache,
+      GrpcDispatcherClient dispatcherClient,
+      WindmillServerStub windmillServer) {
+    if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+      if (!(this.streamingWorkerHarness.get() instanceof 
FanOutStreamingEngineWorkerHarness)) {
+        LOG.debug("Shutting down to SingleSourceWorkerHarness");
+        this.streamingWorkerHarness.get().shutdown();
+        FanOutStreamingEngineWorkerHarness fanoutStreamingWorkerHarness =
+            createFanOutStreamingEngineWorkerHarness(
+                clientId,
+                options,
+                windmillStreamFactory,
+                streamingWorkScheduler,
+                getDataMetricTracker,
+                memoryMonitor,
+                channelCache,
+                dispatcherClient);
+        this.streamingWorkerHarness.set(fanoutStreamingWorkerHarness);
+        this.getDataStatusProvider = getDataMetricTracker::printHtml;

Review Comment:
   resetting these here won't update the references inside `statusPages`. Can 
we update the references in statusPages too?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +370,163 @@ private StreamingDataflowWorker(
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private FanOutStreamingEngineWorkerHarness 
createFanOutStreamingEngineWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      ChannelCache channelCache,
+      GrpcDispatcherClient dispatcherClient) {
+
+    WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+    channelCache = channelCache == null ? createChannelCache(options, 
configFetcher) : channelCache;
+
+    return FanOutStreamingEngineWorkerHarness.create(
+        createJobHeader(options, clientId),
+        GetWorkBudget.builder()
+            .setItems(chooseMaxBundlesOutstanding(options))
+            .setBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build(),
+        windmillStreamFactory,
+        (workItem, serializedWorkItemSize, watermarks, processingContext, 
getWorkStreamLatencies) ->
+            computationStateCache
+                .get(processingContext.computationId())
+                .ifPresent(
+                    computationState -> {
+                      memoryMonitor.waitForResources("GetWork");
+                      streamingWorkScheduler.scheduleWork(
+                          computationState,
+                          workItem,
+                          serializedWorkItemSize,
+                          watermarks,
+                          processingContext,
+                          getWorkStreamLatencies);
+                    }),
+        ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
+        GetWorkBudgetDistributors.distributeEvenly(),
+        Preconditions.checkNotNull(dispatcherClient),
+        commitWorkStream ->
+            StreamingEngineWorkCommitter.builder()
+                // Share the commitByteSemaphore across all created 
workCommitters.
+                .setCommitByteSemaphore(maxCommitByteSemaphore)
+                .setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                .setOnCommitComplete(this::onCompleteCommit)
+                
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                .setCommitWorkStreamFactory(
+                    () -> CloseableStream.create(commitWorkStream, () -> {}))
+                .build(),
+        getDataMetricTracker);
+  }
+
+  private StreamingWorkerHarness createSingleSourceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+    WindmillStreamPool<GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+    GetDataClient getDataClient =
+        new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+    HeartbeatSender heartbeatSender =
+        createStreamingEngineHeartbeatSender(
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+    WorkCommitter workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            .setCommitWorkStreamFactory(
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
+            .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+            .setNumCommitSenders(numCommitThreads)
+            .setOnCommitComplete(this::onCompleteCommit)
+            .build();
+    GetWorkSender getWorkSender =
+        GetWorkSender.forStreamingEngine(
+            receiver -> windmillServer.getWorkStream(request, receiver));
+
+    this.getDataStatusProvider = getDataClient::printHtml;
+    this.currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+
+    return SingleSourceWorkerHarness.builder()
+        .setStreamingWorkScheduler(streamingWorkScheduler)
+        .setWorkCommitter(workCommitter)
+        .setGetDataClient(getDataClient)
+        .setComputationStateFetcher(this.computationStateCache::get)
+        .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
+        .setHeartbeatSender(heartbeatSender)
+        .setGetWorkSender(getWorkSender)
+        .build();
+  }
+
+  private void switchStreamingWorkerHarness(
+      ConnectivityType connectivityType,
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      ChannelCache channelCache,
+      GrpcDispatcherClient dispatcherClient,
+      WindmillServerStub windmillServer) {
+    if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+      if (!(this.streamingWorkerHarness.get() instanceof 
FanOutStreamingEngineWorkerHarness)) {
+        LOG.debug("Shutting down to SingleSourceWorkerHarness");

Review Comment:
   Add a info log saying switching from connectivityType X to Y.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java:
##########
@@ -124,6 +124,10 @@ public static StreamingWorkerStatusPages.Builder builder() 
{
                 new 
ThreadFactoryBuilder().setNameFormat(DUMP_STATUS_PAGES_EXECUTOR).build()));
   }
 
+  public void updateChannelCache(@Nullable ChannelCache channelCache) {

Review Comment:
   unused?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to