sjvanrossum commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1755803852


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+
+      SolaceOutput output;
+      if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+        PCollection<Solace.PublishResult> failedPublish = 
solaceOutput.get(FAILED_PUBLISH_TAG);
+        PCollection<Solace.PublishResult> successfulPublish =
+            solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+        output =
+            rewindow(
+                SolaceOutput.in(input.getPipeline(), failedPublish, 
successfulPublish),
+                windowingStrategy);
+      } else {
+        LOG.info(
+            String.format(
+                "Solace.Write: omitting writer output because delivery mode is 
%s",
+                getDeliveryMode()));
+        output = SolaceOutput.in(input.getPipeline(), null, null);
+      }
+
+      return output;

Review Comment:
   My bad, I thought the record was propagated on the result. The writer should 
not be polling for any responses other than those for its own requests. 
Allowing any publish result to be emitted as output means the output could 
contain results from elements that belong to different bundles or windows. 
Reapplying the input windowing strategy is effectively useless if the input 
elements and output elements can appear in different bundles and windows.
   
   I'm not convinced you're receiving responses to requests initiated on other 
machines though, that would be incredibly unhelpful behavior of a message 
broker and it makes distributed processing impractical. It also renders the 
latency metrics of this connector meaningless because the result of 
`System.nanoTime()` is not related to synchronized system or wall-clock time 
and can thus only be used to measure the duration between two results that were 
produced on the same machine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to