[
https://issues.apache.org/jira/browse/GOBBLIN-2190?focusedWorklogId=958615&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-958615
]
ASF GitHub Bot logged work on GOBBLIN-2190:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Feb/25 05:51
Start Date: 25/Feb/25 05:51
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4093:
URL: https://github.com/apache/gobblin/pull/4093#discussion_r1968820936
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java:
##########
@@ -84,4 +84,34 @@ public interface GobblinTemporalConfigurationKeys {
String TEMPORAL_METRICS_REPORT_INTERVAL_SECS =
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY =
TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";
+
+ /**
+ * Activities timeout configs
+ */
+ String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX +
"activity.heartbeat.timeout.minutes";
+ int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5;
+ String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX +
"activity.heartbeat.interval.minutes";
+ int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = 1;
+ String ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
"activity.starttoclose.timeout.minutes";
+ int DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 360;
+ String TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "generate.workunits." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "recommend.scaling." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "delete.work.dirs." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
+ PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
+ String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
+ String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
+ int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
+ String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.interval.seconds";
+ int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 100;
+ String TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT =
TEMPORAL_ACTIVITY_RETRY_OPTIONS + "backoff.coefficient";
+ int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2;
Review Comment:
the `backoffCoefficient` is typically defined as a double.. `RetryOptions`
class also takes input as double
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import lombok.Getter;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be
performed.
+ */
+public enum ActivityType {
+ /** Activity type for generating work units. */
+
GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for recommending scaling operations. */
+
RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for deleting work directories. */
+
DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for processing a work unit. */
+
PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for committing step. */
+
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Default placeholder activity type. */
+
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
+
+ @Getter private final String startToCloseTimeoutConfigKey;
+
+ ActivityType(String startToCloseTimeoutConfigKey) {
+ this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
+ }
+
+ public ActivityOptions buildActivityOptions(Properties props) {
+ return ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(getStartToCloseTimeout(props))
+ .setHeartbeatTimeout(getHeartbeatTimeout(props))
+ .setRetryOptions(buildRetryOptions(props))
+ .build();
+ }
+
+ private Duration getStartToCloseTimeout(Properties props) {
+ return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
this.startToCloseTimeoutConfigKey,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+ }
+
+ private Duration getHeartbeatTimeout(Properties props) {
+ return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
+
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+ }
+
+ private RetryOptions buildRetryOptions(Properties props) {
+ return RetryOptions.newBuilder()
+
.setInitialInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props,
+
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS)))
+
.setMaximumInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props,
+
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS)))
+ .setBackoffCoefficient(PropertiesUtils.getPropAsInt(props,
Review Comment:
use `getPropAsDouble` instead of `getPropAsInt` as per above change of using
double
##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/ActivityTypeTest.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityOptions;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+
+/** Tests for {@link ActivityType} */
+public class ActivityTypeTest {
+
+ private Properties props;
+ private final List<ActivityType> activityTypes =
Arrays.asList(ActivityType.values());
+
+ @BeforeMethod
+ public void setUp() {
+ props = new Properties();
+ }
+
+ @Test
+ public void testDefaultValuesForTimeouts() {
+ activityTypes.stream().map(activityType ->
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+ Assert.assertEquals(activityOptions.getStartToCloseTimeout(),
+
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+ Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
+
Duration.ofMinutes(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(),
+
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS));
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(),
+
Duration.ofSeconds(GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS));
+
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(),
+ (double)
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT);
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(),
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS);
+ });
+ }
+
+ @DataProvider(name = "activityTypesWithStartToCloseTimeout")
+ public Object[][] activityTypesWithStartToCloseTimeout() {
+ return new Object[][] {
+ {ActivityType.GENERATE_WORKUNITS, 333},
+ {ActivityType.RECOMMEND_SCALING, 111},
+ {ActivityType.DELETE_WORK_DIRS, 222},
+ {ActivityType.PROCESS_WORKUNIT, 555},
+ {ActivityType.COMMIT, 444},
+ {ActivityType.DEFAULT_ACTIVITY, 1}
+ };
+ }
+
+ @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+ public void testStartToCloseTimeout(ActivityType activityType, int
expectedTimeout) {
+ props.setProperty(activityType.getStartToCloseTimeoutConfigKey(),
Integer.toString(expectedTimeout));
+
Assert.assertEquals(activityType.buildActivityOptions(props).getStartToCloseTimeout(),
Duration.ofMinutes(expectedTimeout));
+ }
+
+ @Test
+ public void testHeartBeatTimeout() {
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
"14");
+ activityTypes.stream().map(activityType ->
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+ Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
Duration.ofMinutes(14));
+ });
+ }
+
+ @Test
+ public void testRetryOptions() {
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
"115");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
"5550");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
"7");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
"21");
+
+ activityTypes.stream().map(activityType ->
activityType.buildActivityOptions(props)).forEach(activityOptions -> {
+
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(),
Duration.ofSeconds(115));
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(),
Duration.ofSeconds(5550));
+
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(),
7.0);
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumAttempts(), 21);
+ });
+ }
+
+ @Test(dataProvider = "activityTypesWithStartToCloseTimeout")
+ public void testBuildActivityOptions(ActivityType activityType, int
expectedTimeout) {
+ props.setProperty(activityType.getStartToCloseTimeoutConfigKey(),
Integer.toString(expectedTimeout));
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
"144");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
"115");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
"5550");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
"7");
+
props.setProperty(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
"21");
+
+ ActivityOptions activityOptions = activityType.buildActivityOptions(props);
+
+ Assert.assertEquals(activityOptions.getStartToCloseTimeout(),
Duration.ofMinutes(expectedTimeout));
+ Assert.assertEquals(activityOptions.getHeartbeatTimeout(),
Duration.ofMinutes(144));
+
Assert.assertEquals(activityOptions.getRetryOptions().getInitialInterval(),
Duration.ofSeconds(115));
+
Assert.assertEquals(activityOptions.getRetryOptions().getMaximumInterval(),
Duration.ofSeconds(5550));
+
Assert.assertEquals(activityOptions.getRetryOptions().getBackoffCoefficient(),
7.0);
Review Comment:
since double values have chances of rounding issues, include a delta also in
the assertion `(..., 7.0, .001)`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -81,58 +81,20 @@
public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
public static final String PROCESS_WORKFLOW_ID_BASE = "ProcessWorkUnits";
- public static final Duration genWUsStartToCloseTimeout =
Duration.ofHours(2); // TODO: make configurable... also add activity heartbeats
-
- private static final RetryOptions GEN_WUS_ACTIVITY_RETRY_OPTS =
RetryOptions.newBuilder()
- .setInitialInterval(Duration.ofSeconds(3))
- .setMaximumInterval(Duration.ofSeconds(100))
- .setBackoffCoefficient(2)
- .setMaximumAttempts(4)
- .build();
-
- private static final ActivityOptions GEN_WUS_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(genWUsStartToCloseTimeout)
- .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS)
- .build();
-
- private final GenerateWorkUnits genWUsActivityStub =
Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS);
-
- private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS =
RetryOptions.newBuilder()
- .setInitialInterval(Duration.ofSeconds(3))
- .setMaximumInterval(Duration.ofSeconds(100))
- .setBackoffCoefficient(2)
- .setMaximumAttempts(4)
- .build();
-
- private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofMinutes(5))
- .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS)
- .build();
- private final RecommendScalingForWorkUnits recommendScalingStub =
Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
- RECOMMEND_SCALING_ACTIVITY_OPTS);
-
- private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS =
RetryOptions.newBuilder()
- .setInitialInterval(Duration.ofSeconds(3))
- .setMaximumInterval(Duration.ofSeconds(100))
- .setBackoffCoefficient(2)
- .setMaximumAttempts(4)
- .build();
-
- private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofMinutes(10))
- .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
- .build();
- private final DeleteWorkDirsActivity deleteWorkDirsActivityStub =
Workflow.newActivityStub(DeleteWorkDirsActivity.class,
DELETE_WORK_DIRS_ACTIVITY_OPTS);
-
@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); //
update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult =
Optional.empty();
WUProcessingSpec wuSpec = createProcessingSpec(jobProps,
eventSubmitterContext);
+ // Filtering only temporal job properties to pass to child workflows to
avoid passing unnecessary properties
+ final Properties temporalJobProps =
PropertiesUtils.extractPropertiesWithPrefix(jobProps,
+
com.google.common.base.Optional.of(GobblinTemporalConfigurationKeys.PREFIX));
Review Comment:
import `com.google.common.base.Optional`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/ActivityType.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import lombok.Getter;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * Enum representing different types of activities in the Temporal workflow.
+ * Each activity type corresponds to a specific operation that can be
performed.
+ */
+public enum ActivityType {
+ /** Activity type for generating work units. */
+
GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for recommending scaling operations. */
+
RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for deleting work directories. */
+
DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for processing a work unit. */
+
PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Activity type for committing step. */
+
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),
+
+ /** Default placeholder activity type. */
+
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);
+
+ @Getter private final String startToCloseTimeoutConfigKey;
+
+ ActivityType(String startToCloseTimeoutConfigKey) {
+ this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
+ }
+
+ public ActivityOptions buildActivityOptions(Properties props) {
+ return ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(getStartToCloseTimeout(props))
+ .setHeartbeatTimeout(getHeartbeatTimeout(props))
+ .setRetryOptions(buildRetryOptions(props))
+ .build();
+ }
+
+ private Duration getStartToCloseTimeout(Properties props) {
+ return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
this.startToCloseTimeoutConfigKey,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
+ }
+
+ private Duration getHeartbeatTimeout(Properties props) {
+ return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
+
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
+ }
+
+ private RetryOptions buildRetryOptions(Properties props) {
+ return RetryOptions.newBuilder()
+
.setInitialInterval(Duration.ofSeconds(PropertiesUtils.getPropAsInt(props,
Review Comment:
we can check that the initial interval is not greater than max interval
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java:
##########
@@ -74,7 +74,7 @@ public interface GobblinTemporalConfigurationKeys {
String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX +
"polling.interval.seconds";
int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
-
+
Review Comment:
nit: remove space
Issue Time Tracking
-------------------
Worklog Id: (was: 958615)
Time Spent: 2h 40m (was: 2.5h)
> Implement ActivityTimeoutStrategy for all Temporal Activities
> -------------------------------------------------------------
>
> Key: GOBBLIN-2190
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2190
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> Currently TImeouts of all Temporal Activity are hardcoded and cant change
> during runtime, change those to make them configurable.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)