iht commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1755252806


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+
+      SolaceOutput output;
+      if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+        PCollection<Solace.PublishResult> failedPublish = 
solaceOutput.get(FAILED_PUBLISH_TAG);
+        PCollection<Solace.PublishResult> successfulPublish =
+            solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+        output =
+            rewindow(
+                SolaceOutput.in(input.getPipeline(), failedPublish, 
successfulPublish),
+                windowingStrategy);
+      } else {
+        LOG.info(
+            String.format(
+                "Solace.Write: omitting writer output because delivery mode is 
%s",
+                getDeliveryMode()));
+        output = SolaceOutput.in(input.getPipeline(), null, null);
+      }
+
+      return output;
+    }
+
+    private ParDo.MultiOutput<KV<Integer, Solace.Record>, 
Solace.PublishResult> getWriterTransform(
+        SerializableFunction<Solace.Record, Destination> destinationFn) {
+
+      ParDo.SingleOutput<KV<Integer, Solace.Record>, Solace.PublishResult> 
writer =
+          ParDo.of(
+              getWriterType() == WriterType.STREAMING
+                  ? new UnboundedStreamingSolaceWriter.WriterDoFn(
+                      destinationFn,
+                      checkNotNull(getSessionServiceFactory()),
+                      getDeliveryMode(),
+                      getDispatchMode(),
+                      getNumberOfClientsPerWorker(),
+                      getPublishLatencyMetrics())
+                  : new UnboundedBatchedSolaceWriter.WriterDoFn(
+                      destinationFn,
+                      checkNotNull(getSessionServiceFactory()),
+                      getDeliveryMode(),
+                      getDispatchMode(),
+                      getNumberOfClientsPerWorker(),
+                      getPublishLatencyMetrics()));
+
+      return writer.withOutputTags(FAILED_PUBLISH_TAG, 
TupleTagList.of(SUCCESSFUL_PUBLISH_TAG));
+    }
+
+    private SolaceOutput rewindow(
+        SolaceOutput solacePublishResult,
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+      PCollection<Solace.PublishResult> correct = 
solacePublishResult.getSuccessfulPublish();
+      PCollection<Solace.PublishResult> failed = 
solacePublishResult.getFailedPublish();
+
+      PCollection<Solace.PublishResult> correctWithWindow = null;
+      PCollection<Solace.PublishResult> failedWithWindow = null;
+
+      if (correct != null) {
+        correctWithWindow = applyOriginalWindow(correct, strategy, "Rewindow 
correct");
+      }
+
+      if (failed != null) {
+        failedWithWindow = applyOriginalWindow(failed, strategy, "Rewindow 
failed");
+      }
+
+      return SolaceOutput.in(
+          solacePublishResult.getPipeline(), failedWithWindow, 
correctWithWindow);
+    }
+
+    private static PCollection<Solace.PublishResult> applyOriginalWindow(
+        PCollection<Solace.PublishResult> pcoll,
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy,
+        String label) {
+      Window<Solace.PublishResult> originalWindow = 
captureWindowDetails(strategy);
+
+      if (strategy.getMode() == 
WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) {
+        originalWindow = originalWindow.accumulatingFiredPanes();
+      } else {
+        originalWindow = originalWindow.discardingFiredPanes();
+      }
+
+      return pcoll.apply(label, originalWindow);
+    }
+
+    private static Window<Solace.PublishResult> captureWindowDetails(
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+      return Window.<Solace.PublishResult>into(strategy.getWindowFn())
+          .withAllowedLateness(strategy.getAllowedLateness())
+          .withOnTimeBehavior(strategy.getOnTimeBehavior())
+          .withTimestampCombiner(strategy.getTimestampCombiner())
+          .triggering(strategy.getTrigger());
+    }
+
+    /**
+     * Called before running the Pipeline to verify this transform is fully 
and correctly specified.
+     */
+    private void validateWriteTransform(boolean usingSolaceRecords) {
+      if (!usingSolaceRecords) {
+        Preconditions.checkArgument(
+            getFormatFunction() != null,
+            "SolaceIO.Write: If you are not using Solace.Record as the input 
type, you"
+                + " must set a format function using withFormatFunction().");
+      }
+
+      Preconditions.checkArgument(
+          getMaxNumOfUsedWorkers() > 0,
+          "SolaceIO.Write: The number of used workers must be positive.");
+      Preconditions.checkArgument(
+          getNumberOfClientsPerWorker() > 0,
+          "SolaceIO.Write: The number of clients per worker must be 
positive.");
+      Preconditions.checkArgument(
+          getDeliveryMode() == DeliveryMode.DIRECT || getDeliveryMode() == 
DeliveryMode.PERSISTENT,
+          String.format(
+              "SolaceIO.Write: Delivery mode must be either DIRECT or 
PERSISTENT. %s"
+                  + " not supported",
+              getDeliveryMode()));
+      if (getPublishLatencyMetrics()) {
+        Preconditions.checkArgument(
+            getDeliveryMode() == DeliveryMode.PERSISTENT,
+            "SolaceIO.Write: Publish latency metrics can only be enabled for 
PERSISTENT"
+                + " delivery mode.");
+      }
+      Preconditions.checkArgument(
+          getSessionServiceFactory() != null,
+          "SolaceIO: You need to pass a session service factory. For basic"
+              + " authentication, you can use 
BasicAuthJcsmpSessionServiceFactory.");

Review Comment:
   Changed in upcoming commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to