Abacn commented on code in PR #34525:
URL: https://github.com/apache/beam/pull/34525#discussion_r2028939922
##########
sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java:
##########
@@ -205,25 +202,51 @@ Row getConfigurationRow() {
// May return an empty row (perhaps the underlying transform doesn't have
any required
// parameters)
@VisibleForTesting
- static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
+ static Row getRowConfig(ManagedConfig config, Schema transformConfigSchema) {
Map<String, Object> configMap = config.resolveUnderlyingConfig();
// Build a config Row that will be used to build the underlying
SchemaTransform.
// If a mapping for the SchemaTransform exists, we use it to update
parameter names to align
// with the underlying SchemaTransform config schema
- Map<String, String> mapping =
MAPPINGS.get(config.getTransformIdentifier());
- if (mapping != null && configMap != null) {
+ Map<String, String> namingOverride =
CONFIG_NAME_OVERRIDES.get(config.getTransformIdentifier());
+ if (namingOverride != null && configMap != null) {
Map<String, Object> remappedConfig = new HashMap<>();
for (Map.Entry<String, Object> entry : configMap.entrySet()) {
String paramName = entry.getKey();
- if (mapping.containsKey(paramName)) {
- paramName = mapping.get(paramName);
+ if (namingOverride.containsKey(paramName)) {
+ paramName = namingOverride.get(paramName);
}
remappedConfig.put(paramName, entry.getValue());
}
configMap = remappedConfig;
}
- return YamlUtils.toBeamRow(configMap, transformSchema, false);
+ validateUserConfig(
+ config.getTransformIdentifier(), new HashSet<>(configMap.keySet()),
transformConfigSchema);
+
+ return YamlUtils.toBeamRow(configMap, transformConfigSchema, false);
+ }
+
+ static void validateUserConfig(
+ String transformId, Set<String> userParams, Schema
transformConfigSchema) {
+ List<String> missingRequiredFields = new ArrayList<>();
+ for (Schema.Field field : transformConfigSchema.getFields()) {
+ boolean inUserConfig = userParams.remove(field.getName());
+ if (!field.getType().getNullable() && !inUserConfig) {
+ missingRequiredFields.add(field.getName());
+ }
+ }
+
+ if (!missingRequiredFields.isEmpty() || !userParams.isEmpty()) {
+ String msg = "Invalid config for transform '" + transformId + "':";
+ if (!missingRequiredFields.isEmpty()) {
+ msg += " Missing required fields: " + missingRequiredFields + ".";
+ }
+ if (!userParams.isEmpty()) {
+ msg += " Contains unknown fields: " + userParams + ".";
Review Comment:
I remember when I worked with SchemaIO I encountered similar scenario due to
Beam version mismatch between Python / and expansion service. This could be a
valid use case on dev, e.g. python code is presubmit and java expansion service
not yet regenerated.
Could this happen in managed transform also? Saying user is using a newer
version of Beam with added configurations while managed backend isn't rollout
out
if so shall we issue warning here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]