Amar3tto commented on code in PR #22584:
URL: https://github.com/apache/beam/pull/22584#discussion_r938509466
##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -83,46 +86,50 @@ public Plugin withConfig(PluginConfig pluginConfig) {
* validating connection to the CDAP sink/source and performing initial
tuning.
*/
public void prepareRun() {
- PluginConfig pluginConfig = getPluginConfig();
- checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
- if (cdapPluginObj == null) {
- try {
- Constructor<?> constructor =
- getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
- constructor.setAccessible(true);
- cdapPluginObj = (SubmitterLifecycle)
constructor.newInstance(pluginConfig);
- } catch (Exception e) {
- LOG.error("Can not instantiate CDAP plugin class", e);
- throw new IllegalStateException("Can not call prepareRun");
- }
- }
- try {
- cdapPluginObj.prepareRun(getContext());
- if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
- for (Map.Entry<String, String> entry :
-
getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet())
{
- getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+ if (!isUnbounded()) {
+ PluginConfig pluginConfig = getPluginConfig();
+ checkStateNotNull(pluginConfig, "PluginConfig should be not null!");
+ if (cdapPluginObj == null) {
+ try {
+ Constructor<?> constructor =
+ getPluginClass().getDeclaredConstructor(pluginConfig.getClass());
+ constructor.setAccessible(true);
+ cdapPluginObj = (SubmitterLifecycle)
constructor.newInstance(pluginConfig);
+ } catch (Exception e) {
+ LOG.error("Can not instantiate CDAP plugin class", e);
+ throw new IllegalStateException("Can not call prepareRun");
}
- } else {
- for (Map.Entry<String, String> entry :
-
getContext().getOutputFormatProvider().getOutputFormatConfiguration().entrySet())
{
- getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+ }
+ try {
+ cdapPluginObj.prepareRun(getContext());
+ if (getPluginType().equals(PluginConstants.PluginType.SOURCE)) {
+ for (Map.Entry<String, String> entry :
Review Comment:
For `Source` plugins we are copying configuration from
`InputFormatProvider`, for `Sink` plugins from `OutputFormatProvider`.
Do you suggest moving it to a separate method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]