johnjcasey commented on code in PR #29362:
URL: https://github.com/apache/beam/pull/29362#discussion_r1388416694
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java:
##########
@@ -328,4 +332,40 @@ private static int findAvailablePort() throws IOException {
public void close() throws Exception {
clientFactory.close();
}
+
+ /**
+ * A utility to find the registered URN for a given transform.
+ *
+ * <p>This URN can be used to upgrade this transform to a new Beam version
without upgrading the
+ * rest of the pipeline. Please see <a
+ *
href="https://beam.apache.org/documentation/programming-guide/#transform-service">Beam
+ * Transform Service documentation</a> for more details.
+ *
+ * <p>For this lookup to work, the a {@link
TransformPayloadTranslatorRegistrar} for the transform
+ * has to be available in the classpath.
+ *
+ * @param transform transform to lookup.
+ * @return a URN if discovered. Returns {@code null} otherwise.
+ */
+ @SuppressWarnings({
+ "rawtypes",
+ "EqualsIncompatibleType",
+ })
+ public static @Nullable String findUpgradeURN(
+ org.apache.beam.sdk.transforms.PTransform transform) {
Review Comment:
can we fix this import?
##########
sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java:
##########
@@ -0,0 +1,578 @@
+/*
Review Comment:
This class looks like it contains a bunch of relatively boilerplate code.
Given that we would want translation and upgrades for all pre-built composites
eventually, is there any way to try and avoid this boilerplate?
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java:
##########
@@ -328,4 +332,40 @@ private static int findAvailablePort() throws IOException {
public void close() throws Exception {
clientFactory.close();
}
+
+ /**
+ * A utility to find the registered URN for a given transform.
+ *
+ * <p>This URN can be used to upgrade this transform to a new Beam version
without upgrading the
+ * rest of the pipeline. Please see <a
+ *
href="https://beam.apache.org/documentation/programming-guide/#transform-service">Beam
+ * Transform Service documentation</a> for more details.
+ *
+ * <p>For this lookup to work, the a {@link
TransformPayloadTranslatorRegistrar} for the transform
+ * has to be available in the classpath.
+ *
+ * @param transform transform to lookup.
+ * @return a URN if discovered. Returns {@code null} otherwise.
+ */
+ @SuppressWarnings({
+ "rawtypes",
+ "EqualsIncompatibleType",
+ })
+ public static @Nullable String findUpgradeURN(
+ org.apache.beam.sdk.transforms.PTransform transform) {
+ for (TransformPayloadTranslatorRegistrar registrar :
+ ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+
+ for (Entry<
+ ? extends Class<? extends
org.apache.beam.sdk.transforms.PTransform>,
+ ? extends TransformPayloadTranslator>
+ entry : registrar.getTransformPayloadTranslators().entrySet()) {
+ if (entry.getKey().equals(transform.getClass())) {
+ return entry.getValue().getUrn();
Review Comment:
Do we care if there are multiple potential matches?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]