sv2000 commented on a change in pull request #2956: GOBBLIN-1116: Avoid
registering schema with schema registry during Me…
URL: https://github.com/apache/incubator-gobblin/pull/2956#discussion_r410378851
##########
File path:
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
##########
@@ -919,6 +935,87 @@ private void
sendEmailOnShutdown(Optional<ApplicationReport> applicationReport)
}
}
+ private static Config addDynamicConfig(Config config) throws IOException {
+ if (isKafkaReportingEnabled(config) &&
isKafkaAvroSchemaRegistryEnabled(config)) {
+ KafkaAvroSchemaRegistry registry = new
KafkaAvroSchemaRegistry(ConfigUtils.configToProperties(config));
+ return addMetricReportingDynamicConfig(config, registry);
+ } else {
+ return config;
+ }
+ }
+
+ /**
+ * Write the config to the file specified with the config key {@value
GOBBLIN_YARN_CONFIG_OUTPUT_PATH} if it
+ * is configured.
+ * @param config the config to output
+ * @throws IOException
+ */
+ @VisibleForTesting
+ static void outputConfigToFile(Config config)
+ throws IOException {
+ // If a file path is specified then write the Azkaban config to that path
in HOCON format.
+ // This can be used to generate an application.conf file to pass to the
yarn app master and containers.
+ if (config.hasPath(GOBBLIN_YARN_CONFIG_OUTPUT_PATH)) {
+ File configFile = new
File(config.getString(GOBBLIN_YARN_CONFIG_OUTPUT_PATH));
+ File parentDir = configFile.getParentFile();
+
+ if (parentDir != null && !parentDir.exists()) {
+ if (!parentDir.mkdirs()) {
+ throw new IOException("Error creating directories for " + parentDir);
+ }
+ }
+
+ ConfigRenderOptions configRenderOptions = ConfigRenderOptions.defaults();
+ configRenderOptions = configRenderOptions.setComments(false);
+ configRenderOptions = configRenderOptions.setOriginComments(false);
+ configRenderOptions = configRenderOptions.setFormatted(true);
+ configRenderOptions = configRenderOptions.setJson(false);
+
+ String renderedConfig = config.root().render(configRenderOptions);
+
+ FileUtils.writeStringToFile(configFile, renderedConfig, Charsets.UTF_8);
+ }
+ }
+
+ /**
+ * A method that adds dynamic config related to Kafka-based metric
reporting. In particular, if Kafka based metric
+ * reporting is enabled and {@link KafkaAvroSchemaRegistry} is configured,
this method registers the metric reporting
+ * related schemas and adds the returned schema ids to the config to be used
by metric reporters in {@link org.apache.gobblin.yarn.GobblinApplicationMaster}
+ * and the {@link org.apache.gobblin.cluster.GobblinTaskRunner}s. The
advantage of doing this is that the TaskRunners
+ * do not have to initiate a connection with the schema registry server and
reduces the chances of metric reporter
+ * instantiation failures in the {@link
org.apache.gobblin.cluster.GobblinTaskRunner}s.
+ * @param config
+ */
+ @VisibleForTesting
+ static Config addMetricReportingDynamicConfig(Config config,
KafkaAvroSchemaRegistry registry) throws IOException {
+ Properties properties = ConfigUtils.configToProperties(config);
+ if (KafkaReporterUtils.isEventsEnabled(properties)) {
+ Schema schema = new Schema.Parser()
Review comment:
Refactored the code to move the schema reading to the utils class.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services