iht commented on code in PR #32060: URL: https://github.com/apache/beam/pull/32060#discussion_r1755182004
########## sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java: ########## @@ -1026,8 +1057,180 @@ abstract static class Builder<T> { @Override public SolaceOutput expand(PCollection<T> input) { - // TODO: will be sent in upcoming PR - return SolaceOutput.in(input.getPipeline(), null, null); + Class<? super T> pcollClass = checkNotNull(input.getTypeDescriptor()).getRawType(); + boolean usingSolaceRecord = + pcollClass + .getTypeName() + .equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record") + || pcollClass.isAssignableFrom(Solace.Record.class); + + validateWriteTransform(usingSolaceRecord); + + boolean usingDynamicDestinations = getDestination() == null; + SerializableFunction<Solace.Record, Destination> destinationFn; + if (usingDynamicDestinations) { + destinationFn = x -> SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination())); + } else { + // Constant destination for all messages (same topic or queue) + // This should not be non-null, as nulls would have been flagged by the + // validateWriteTransform method + destinationFn = x -> checkNotNull(getDestination()); + } + + @SuppressWarnings("unchecked") + PCollection<Solace.Record> records = + getFormatFunction() == null + ? (PCollection<Solace.Record>) input + : input.apply( + "Format records", + MapElements.into(TypeDescriptor.of(Solace.Record.class)) + .via(checkNotNull(getFormatFunction()))); + + // Store the current window used by the input + PCollection<Solace.PublishResult> captureWindow = + records.apply( + "Capture window", ParDo.of(new UnboundedSolaceWriter.RecordToPublishResultDoFn())); + + @SuppressWarnings("unchecked") + WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy = + (WindowingStrategy<Solace.PublishResult, BoundedWindow>) + captureWindow.getWindowingStrategy(); + + PCollection<Solace.Record> withGlobalWindow = + records.apply("Global window", Window.into(new GlobalWindows())); + + PCollection<KV<Integer, Solace.Record>> withShardKeys = + withGlobalWindow.apply( + "Add shard key", + ParDo.of(new UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers()))); + + String label = + getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : "Publish (batched)"; + + PCollectionTuple solaceOutput = withShardKeys.apply(label, getWriterTransform(destinationFn)); + Review Comment: Yes, I think that `GroupIntoBatches` would be a more elegant solution here and it would not need a state & timers custom implementation. About session-independent messages, that's actually what I am using :). I think my implementation for the producer does actually that. There is a concurrent map of producers, and each thread/worker polls for a producer to be used to do some work. The clients are closed automatically after inactivity (by the client libraries) and the producer map takes care of making sure the producer reconnects when needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org