chamikaramj commented on code in PR #29362:
URL: https://github.com/apache/beam/pull/29362#discussion_r1389735392


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java:
##########
@@ -328,4 +332,40 @@ private static int findAvailablePort() throws IOException {
   public void close() throws Exception {
     clientFactory.close();
   }
+
+  /**
+   * A utility to find the registered URN for a given transform.
+   *
+   * <p>This URN can be used to upgrade this transform to a new Beam version 
without upgrading the
+   * rest of the pipeline. Please see <a
+   * 
href="https://beam.apache.org/documentation/programming-guide/#transform-service";>Beam
+   * Transform Service documentation</a> for more details.
+   *
+   * <p>For this lookup to work, the a {@link 
TransformPayloadTranslatorRegistrar} for the transform
+   * has to be available in the classpath.
+   *
+   * @param transform transform to lookup.
+   * @return a URN if discovered. Returns {@code null} otherwise.
+   */
+  @SuppressWarnings({
+    "rawtypes",
+    "EqualsIncompatibleType",
+  })
+  public static @Nullable String findUpgradeURN(
+      org.apache.beam.sdk.transforms.PTransform transform) {
+    for (TransformPayloadTranslatorRegistrar registrar :
+        ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+
+      for (Entry<
+              ? extends Class<? extends 
org.apache.beam.sdk.transforms.PTransform>,
+              ? extends TransformPayloadTranslator>
+          entry : registrar.getTransformPayloadTranslators().entrySet()) {
+        if (entry.getKey().equals(transform.getClass())) {
+          return entry.getValue().getUrn();

Review Comment:
   The contract is to upgrade all transforms that match the URN (and 
TransformUpgrader already handles this).



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java:
##########
@@ -328,4 +332,40 @@ private static int findAvailablePort() throws IOException {
   public void close() throws Exception {
     clientFactory.close();
   }
+
+  /**
+   * A utility to find the registered URN for a given transform.
+   *
+   * <p>This URN can be used to upgrade this transform to a new Beam version 
without upgrading the
+   * rest of the pipeline. Please see <a
+   * 
href="https://beam.apache.org/documentation/programming-guide/#transform-service";>Beam
+   * Transform Service documentation</a> for more details.
+   *
+   * <p>For this lookup to work, the a {@link 
TransformPayloadTranslatorRegistrar} for the transform
+   * has to be available in the classpath.
+   *
+   * @param transform transform to lookup.
+   * @return a URN if discovered. Returns {@code null} otherwise.
+   */
+  @SuppressWarnings({
+    "rawtypes",
+    "EqualsIncompatibleType",
+  })
+  public static @Nullable String findUpgradeURN(
+      org.apache.beam.sdk.transforms.PTransform transform) {

Review Comment:
   Updated.



##########
sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java:
##########
@@ -0,0 +1,578 @@
+/*

Review Comment:
   So all we do here is building a Row object from the transform object and 
vice versa using utilities available in the Beam Schema library. The rest of 
the boilerplate here is mostly error/null checks which cannot be refactored 
away easily. We do have fromByteArray/toByteArray methods here to concert 
objects that do not have a Schema defined (for example, user specified 
functions) but these are pretty small and different classes can choose to do 
the serialization in different ways. 
   
   Do you have a specific refactoring in mind to simplify this code ?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to