[
https://issues.apache.org/jira/browse/GOBBLIN-1968?focusedWorklogId=893914&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893914
]
ASF GitHub Bot logged work on GOBBLIN-1968:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 04/Dec/23 23:14
Start Date: 04/Dec/23 23:14
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3829:
URL: https://github.com/apache/gobblin/pull/3829#discussion_r1414609489
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
Review Comment:
Java doc for this implementation to describe high level logic
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob =
JobContext.shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ //TODO: Make this configurable
+ final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
"<job_name_stub>");
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ /**
+ * Organize task states by dataset urns.
+ * @param taskStates
+ * @return
+ */
+ public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+ //TODO: handle skipped tasks?
+ for (TaskState taskState : taskStates) {
+ String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
+ datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
+ datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
+ }
+
+ return datasetStatesByUrns;
+ }
+
+ private static String createDatasetUrn(Map<String, JobState.DatasetState>
datasetStatesByUrns, TaskState taskState) {
Review Comment:
when do we not have a datasetUrn already set and when do we?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
Review Comment:
It's not immediately obvious to me what the difference btwn these two paths
will be. Do we only append "output" to the end of the `jobIdParent` path? Why
is that?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob =
JobContext.shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ //TODO: Make this configurable
+ final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
+ }
+
+ List<Either<Void, ExecutionException>> result = new
IteratorExecutor<>(Iterables
+ .transform(datasetStatesByUrns.entrySet(),
+ new Function<Map.Entry<String, JobState.DatasetState>,
Callable<Void>>() {
+ @Nullable
+ @Override
+ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry) {
+ return new SafeDatasetCommit(shouldCommitDataInJob, false,
deliverySemantics, entry.getKey(),
+ entry.getValue(), false, jobContext);
+ }
+ }).iterator(), numCommitThreads,
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("Commit-thread-%d")))
+ .executeAndGetResults();
+
+ IteratorExecutor.logFailures(result, null, 10);
+
+ if (!IteratorExecutor.verifyAllSuccessful(result)) {
+ // TODO: propagate cause of failure
+ String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
"<job_name_stub>");
+ throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
+ }
+ } catch (InterruptedException exc) {
+ throw new IOException(exc);
+ }
+ }
+
+ /**
+ * Organize task states by dataset urns.
+ * @param taskStates
+ * @return
+ */
+ public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
+ Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
+
+ //TODO: handle skipped tasks?
+ for (TaskState taskState : taskStates) {
+ String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
+ datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
+ datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
+ }
+
+ return datasetStatesByUrns;
+ }
+
+ private static String createDatasetUrn(Map<String, JobState.DatasetState>
datasetStatesByUrns, TaskState taskState) {
Review Comment:
let's add a short java doc
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+import io.temporal.failure.ApplicationFailure;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.commit.DeliverySemantics;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.SafeDatasetCommit;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
+import org.apache.gobblin.util.Either;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+@Slf4j
+public class CommitActivityImpl implements CommitActivity {
+
+ static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10;
+ static int DEFAULT_NUM_COMMIT_THREADS = 1;
+ @Override
+ public int commit(WUProcessingSpec workSpec) {
+ // TODO: Make this configurable
+ int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
+ try {
+ FileSystem fs = Help.loadFileSystem(workSpec);
+ JobState jobState = Help.loadJobState(workSpec, fs);
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
+ JobContext globalGobblinContext = new
JobContext(jobState.getProperties(), log, instanceBroker, null);
+ // TODO: Task state dir is a stub with the assumption it is always
colocated with the workunits dir (as in the case of MR which generates
workunits)
+ Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
+ Path jobOutputPath = new Path(new Path(jobIdParent, "output"),
jobIdParent.getName());
+ log.info("Output path at: " + jobOutputPath + " with fs at " +
fs.getUri());
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ Collection<TaskState> taskStateQueue =
+ ImmutableList.copyOf(
+
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobOutputPath, numDeserializationThreads));
+ commitTaskStates(jobState, taskStateQueue, globalGobblinContext);
+ return taskStateQueue.size();
+ } catch (Exception e) {
+ //TODO: IMPROVE GRANULARITY OF RETRIES
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ "Failed to commit dataset state for some dataset(s) of job
<jobStub>",
+ IOException.class.toString(),
+ new IOException(e),
+ null
+ );
+ }
+ }
+
+ /**
+ * Commit task states to the dataset state store.
+ * @param jobState
+ * @param taskStates
+ * @param jobContext
+ * @throws IOException
+ */
+ void commitTaskStates(State jobState, Collection<TaskState> taskStates,
JobContext jobContext) throws IOException {
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(taskStates);
+ final boolean shouldCommitDataInJob =
JobContext.shouldCommitDataInJob(jobState);
+ final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
+ //TODO: Make this configurable
+ final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS;
+ if (!shouldCommitDataInJob) {
+ log.info("Job will not commit data since data are committed by tasks.");
+ }
+
+ try {
+ if (!datasetStatesByUrns.isEmpty()) {
+ log.info("Persisting {} dataset urns.", datasetStatesByUrns.size());
Review Comment:
let's add info about the dataset otherwise this cannot be connected to a
particular job
Issue Time Tracking
-------------------
Worklog Id: (was: 893914)
Time Spent: 1h 20m (was: 1h 10m)
> Gobblin Commit Step Runs on Temporal
> ------------------------------------
>
> Key: GOBBLIN-1968
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1968
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Temporal is the next-gen distributed computing platform Gobblin is using to
> replace MapReduce for many of its data movement workflows. We want to
> integrate Gobblin's commit step that tracks task states and reports their
> status with Temporal
--
This message was sent by Atlassian Jira
(v8.20.10#820010)