[
https://issues.apache.org/jira/browse/GOBBLIN-2190?focusedWorklogId=956014&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-956014
]
ASF GitHub Bot logged work on GOBBLIN-2190:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Feb/25 08:29
Start Date: 07/Feb/25 08:29
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4093:
URL: https://github.com/apache/gobblin/pull/4093#discussion_r1946094701
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Interface for defining activity configuration strategies for different
Temporal activities.
+ * Each strategy provides a method to retrieve configuration details, such as
timeout duration, based on the provided properties.
+ */
+public interface ActivityConfigurationStrategy {
+ /** Default start to close timeout duration for any activity if not
specified. */
+ Duration defaultStartToCloseTimeout = Duration.ofMinutes(180);
+ int DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 120;
Review Comment:
do we have numbers currently, how long can it take for each activity,
ideally we want this number to be higher than 100th %ile of all executions that
are running and also having scope for what we support(max data copy size -
something that we will have as part of our SLAs for users)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Interface for defining activity configuration strategies for different
Temporal activities.
+ * Each strategy provides a method to retrieve configuration details, such as
timeout duration, based on the provided properties.
+ */
+public interface ActivityConfigurationStrategy {
+ /** Default start to close timeout duration for any activity if not
specified. */
+ Duration defaultStartToCloseTimeout = Duration.ofMinutes(180);
+ int DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 120;
+ int DEFAULT_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 5;
+ int DEFAULT_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 10;
+ int DEFAULT_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;
+ int DEFAULT_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;
Review Comment:
let's update the default numbers to a higher value for now till we have
better visibility and tuning in place. Also, we already see flows which are
running close to these number and timing out those isn't correct eg.
GENERATE_WORKUNITS=4h, PROCESS_WORKUNIT=6h, COMMIT=4h
do we know how much max time max takes/can take and what is the factor that
increases the time, if no, let's also increase DELETE_WORK_DIRS/
RECOMMEND_SCALING time
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be
performed.
+ */
+public enum ActivityType {
Review Comment:
a separate class is not required, this class can be merged with
ActivityConfigurationStrategy as suggested above by using enum in place of
explicit strategy classes
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+
+
+/** Utility class for handling Temporal Activity related operations. */
+@UtilityClass
+@Slf4j
+public class TemporalActivityUtils {
+
+ @VisibleForTesting
+ protected static final RetryOptions DEFAULT_RETRY_OPTIONS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final Map<ActivityType, ActivityConfigurationStrategy>
activityConfigurationStrategies = new HashMap<>();
+
+ static {
+ activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new
ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new
ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new
ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new
ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.COMMIT, new
ActivityConfigurationStrategy.CommitActivityConfigurationStrategy());
+ }
+
+ /**
+ * Builds and returns an {@link ActivityOptions} object configured with the
specified {@link ActivityType} and properties.
+ *
+ * @param activityType the type of the activity for which the options are
being built.
+ * @param props the properties to be used for configuring the activity
options.
+ * @return an {@link ActivityOptions} object configured with the specified
activity type and properties.
+ */
+ public static ActivityOptions buildActivityOptions(ActivityType
activityType, Properties props) {
+ return ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(getStartToCloseTimeout(activityType, props))
+ .setRetryOptions(buildRetryOptions(activityType, props))
+ .build();
+ }
+
+ /**
+ * Retrieves the start to close timeout duration for a given {@link
ActivityType} based on the provided properties.
+ *
+ * @param activityType the type of the activity for which the start to close
timeout is being retrieved.
+ * @param props the properties to be used for configuring the timeout.
+ * @return the start to close timeout duration for the specified activity
type.
+ */
+ private static Duration getStartToCloseTimeout(ActivityType activityType,
Properties props) {
+ ActivityConfigurationStrategy activityConfigurationStrategy =
activityConfigurationStrategies.get(activityType);
+ if (activityConfigurationStrategy == null) {
+ log.warn("No configuration strategy found for activity type {}. Using
default start to close timeout.", activityType);
+ return ActivityConfigurationStrategy.defaultStartToCloseTimeout;
+ }
+ return activityConfigurationStrategy.getStartToCloseTimeout(props);
+ }
+
+ /**
+ * Builds and returns an {@link RetryOptions} object configured with the
specified {@link ActivityType} and properties.
+ *
+ * @param activityType the type of the activity for which the options are
being built.
+ * @param props the properties to be used for configuring the activity
options.
+ * @return an {@link RetryOptions} object configured with the specified
activity type and properties.
+ */
+ private static RetryOptions buildRetryOptions(ActivityType activityType,
Properties props) {
Review Comment:
this logic should be moved to `ActivityType` enum, where it has
`getStartToCloseTimeout` & `getRetryOptions` methods
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+
+
+/** Utility class for handling Temporal Activity related operations. */
+@UtilityClass
+@Slf4j
+public class TemporalActivityUtils {
+
+ @VisibleForTesting
+ protected static final RetryOptions DEFAULT_RETRY_OPTIONS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final Map<ActivityType, ActivityConfigurationStrategy>
activityConfigurationStrategies = new HashMap<>();
+
+ static {
+ activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new
ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new
ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new
ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new
ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.COMMIT, new
ActivityConfigurationStrategy.CommitActivityConfigurationStrategy());
+ }
Review Comment:
this map is unnecessary because ActivityType can uniquely identify the
activity and method `getStartToCloseTimeout`/`getRetryOptions` can be directly
called for each activity
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Interface for defining activity configuration strategies for different
Temporal activities.
+ * Each strategy provides a method to retrieve configuration details, such as
timeout duration, based on the provided properties.
+ */
+public interface ActivityConfigurationStrategy {
+ /** Default start to close timeout duration for any activity if not
specified. */
+ Duration defaultStartToCloseTimeout = Duration.ofMinutes(180);
+ int DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 120;
+ int DEFAULT_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 5;
+ int DEFAULT_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 10;
+ int DEFAULT_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;
+ int DEFAULT_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 180;
+
+ /**
+ * Retrieves the start to close timeout duration for an activity based on
the provided properties.
+ *
+ * @param props the properties to be used for configuring the timeout.
+ * @return the timeout duration for the activity.
+ */
+ Duration getStartToCloseTimeout(Properties props);
+
+ /**
+ * Configuration strategy for the Generate Workunits activity.
+ */
+ class GenerateWorkunitsActivityConfigurationStrategy implements
ActivityConfigurationStrategy {
+ @Override
+ public Duration getStartToCloseTimeout(Properties props) {
+ return Duration.ofMinutes(PropertiesUtils.getPropAsInt(
+ props,
+
GobblinTemporalConfigurationKeys.GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES,
+ DEFAULT_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES
+ ));
+ }
Review Comment:
the same logic is repeated in every strategy class. Instead of creating
multiple classes, please use an enum based approach where each enum has two
fields `key` & `defaultTimeoutMinutes` and it can encapsulate the
`getStartToCloseTimeout` method
```
public enum ActivityType {
...
ActivityType(String timeoutConfigKey, int defaultTimeoutMinutes) {
this.timeoutConfigKey = timeoutConfigKey;
this.defaultTimeoutMinutes = defaultTimeoutMinutes;
}
public Duration getStartToCloseTimeout(Properties props) {
return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
timeoutConfigKey, defaultTimeoutMinutes));
}
...
}
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityConfigurationStrategy.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Interface for defining activity configuration strategies for different
Temporal activities.
+ * Each strategy provides a method to retrieve configuration details, such as
timeout duration, based on the provided properties.
+ */
+public interface ActivityConfigurationStrategy {
+ /** Default start to close timeout duration for any activity if not
specified. */
+ Duration defaultStartToCloseTimeout = Duration.ofMinutes(180);
Review Comment:
this should be static final
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+
+
+/** Utility class for handling Temporal Activity related operations. */
+@UtilityClass
+@Slf4j
+public class TemporalActivityUtils {
+
+ @VisibleForTesting
+ protected static final RetryOptions DEFAULT_RETRY_OPTIONS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final Map<ActivityType, ActivityConfigurationStrategy>
activityConfigurationStrategies = new HashMap<>();
+
+ static {
+ activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new
ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new
ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new
ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new
ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.COMMIT, new
ActivityConfigurationStrategy.CommitActivityConfigurationStrategy());
+ }
+
+ /**
+ * Builds and returns an {@link ActivityOptions} object configured with the
specified {@link ActivityType} and properties.
+ *
+ * @param activityType the type of the activity for which the options are
being built.
+ * @param props the properties to be used for configuring the activity
options.
+ * @return an {@link ActivityOptions} object configured with the specified
activity type and properties.
+ */
+ public static ActivityOptions buildActivityOptions(ActivityType
activityType, Properties props) {
+ return ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(getStartToCloseTimeout(activityType, props))
+ .setRetryOptions(buildRetryOptions(activityType, props))
+ .build();
+ }
+
+ /**
+ * Retrieves the start to close timeout duration for a given {@link
ActivityType} based on the provided properties.
+ *
+ * @param activityType the type of the activity for which the start to close
timeout is being retrieved.
+ * @param props the properties to be used for configuring the timeout.
+ * @return the start to close timeout duration for the specified activity
type.
+ */
+ private static Duration getStartToCloseTimeout(ActivityType activityType,
Properties props) {
+ ActivityConfigurationStrategy activityConfigurationStrategy =
activityConfigurationStrategies.get(activityType);
+ if (activityConfigurationStrategy == null) {
+ log.warn("No configuration strategy found for activity type {}. Using
default start to close timeout.", activityType);
+ return ActivityConfigurationStrategy.defaultStartToCloseTimeout;
+ }
+ return activityConfigurationStrategy.getStartToCloseTimeout(props);
+ }
+
+ /**
+ * Builds and returns an {@link RetryOptions} object configured with the
specified {@link ActivityType} and properties.
+ *
+ * @param activityType the type of the activity for which the options are
being built.
+ * @param props the properties to be used for configuring the activity
options.
+ * @return an {@link RetryOptions} object configured with the specified
activity type and properties.
+ */
+ private static RetryOptions buildRetryOptions(ActivityType activityType,
Properties props) {
+ // Currently returning just the default retry options for each activity
type
+ return DEFAULT_RETRY_OPTIONS;
Review Comment:
retry settings are currently hardcoded which would requires a code change.
Please use PropertiesUtils to fetch each parameter with default values
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/TemporalActivityUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.util;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.ddm.activity.ActivityConfigurationStrategy;
+import org.apache.gobblin.temporal.ddm.activity.ActivityType;
+
+
+/** Utility class for handling Temporal Activity related operations. */
+@UtilityClass
+@Slf4j
+public class TemporalActivityUtils {
+
+ @VisibleForTesting
+ protected static final RetryOptions DEFAULT_RETRY_OPTIONS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final Map<ActivityType, ActivityConfigurationStrategy>
activityConfigurationStrategies = new HashMap<>();
+
+ static {
+ activityConfigurationStrategies.put(ActivityType.GENERATE_WORKUNITS, new
ActivityConfigurationStrategy.GenerateWorkunitsActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.RECOMMEND_SCALING, new
ActivityConfigurationStrategy.RecommendScalingActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.DELETE_WORK_DIRS, new
ActivityConfigurationStrategy.DeleteWorkDirsActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.PROCESS_WORKUNIT, new
ActivityConfigurationStrategy.ProcessWorkunitActivityConfigurationStrategy());
+ activityConfigurationStrategies.put(ActivityType.COMMIT, new
ActivityConfigurationStrategy.CommitActivityConfigurationStrategy());
+ }
+
+ /**
+ * Builds and returns an {@link ActivityOptions} object configured with the
specified {@link ActivityType} and properties.
+ *
+ * @param activityType the type of the activity for which the options are
being built.
+ * @param props the properties to be used for configuring the activity
options.
+ * @return an {@link ActivityOptions} object configured with the specified
activity type and properties.
+ */
+ public static ActivityOptions buildActivityOptions(ActivityType
activityType, Properties props) {
Review Comment:
this is the only method required in this class and the method should
leverage ActivityType enum to fetch timeout/retry info for the activity
Issue Time Tracking
-------------------
Worklog Id: (was: 956014)
Time Spent: 2h (was: 1h 50m)
> Implement ActivityTimeoutStrategy for all Temporal Activities
> -------------------------------------------------------------
>
> Key: GOBBLIN-2190
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2190
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Currently TImeouts of all Temporal Activity are hardcoded and cant change
> during runtime, change those to make them configurable.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)