ahmedabu98 commented on code in PR #34525:
URL: https://github.com/apache/beam/pull/34525#discussion_r2054548603


##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -205,25 +202,51 @@ Row getConfigurationRow() {
   // May return an empty row (perhaps the underlying transform doesn't have 
any required
   // parameters)
   @VisibleForTesting
-  static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
+  static Row getRowConfig(ManagedConfig config, Schema transformConfigSchema) {
     Map<String, Object> configMap = config.resolveUnderlyingConfig();
     // Build a config Row that will be used to build the underlying 
SchemaTransform.
     // If a mapping for the SchemaTransform exists, we use it to update 
parameter names to align
     // with the underlying SchemaTransform config schema
-    Map<String, String> mapping = 
MAPPINGS.get(config.getTransformIdentifier());
-    if (mapping != null && configMap != null) {
+    Map<String, String> namingOverride = 
CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier());
+    if (namingOverride != null && configMap != null) {
       Map<String, Object> remappedConfig = new HashMap<>();
       for (Map.Entry<String, Object> entry : configMap.entrySet()) {
         String paramName = entry.getKey();
-        if (mapping.containsKey(paramName)) {
-          paramName = mapping.get(paramName);
+        if (namingOverride.containsKey(paramName)) {
+          paramName = namingOverride.get(paramName);
         }
         remappedConfig.put(paramName, entry.getValue());
       }
       configMap = remappedConfig;
     }
 
-    return YamlUtils.toBeamRow(configMap, transformSchema, false);
+    validateUserConfig(
+        config.getTransformIdentifier(), new HashSet<>(configMap.keySet()), 
transformConfigSchema);
+
+    return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
+  }
+
+  static void validateUserConfig(
+      String transformId, Set<String> userParams, Schema 
transformConfigSchema) {
+    List<String> missingRequiredFields = new ArrayList<>();
+    for (Schema.Field field : transformConfigSchema.getFields()) {
+      boolean inUserConfig = userParams.remove(field.getName());
+      if (!field.getType().getNullable() && !inUserConfig) {
+        missingRequiredFields.add(field.getName());
+      }
+    }
+
+    if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
+      String msg = "Invalid config for transform '" + transformId + "':";
+      if (!missingRequiredFields.isEmpty()) {
+        msg += " Missing required fields: " + missingRequiredFields + ".";
+      }
+      if (!userParams.isEmpty()) {
+        msg += " Contains unknown fields: " + userParams + ".";

Review Comment:
   After some offline conversations, we settled for a new API 
`.skipConfigValidation()` that will let users opt out if they need to



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to