Will-Lo commented on code in PR #3811: URL: https://github.com/apache/gobblin/pull/3811#discussion_r1375097940
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WFAddr.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.work; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + + +/** Hierarchical address for nesting workflows (0-based). */ +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor +public class WFAddr { Review Comment: Is it okay if we name if `WorkFlowAddr`? Shortening it seems unnecessary here given we refer to work and workflows in other classes without shortening them ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.work; + +import java.util.Iterator; +import java.util.List; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + + +/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an in-memory collection */ +@NoArgsConstructor +@RequiredArgsConstructor +public class SeqBackedWorkSpan<WORK_ITEM> implements Workload.WorkSpan<WORK_ITEM> { + + @NonNull + private List<WORK_ITEM> elems; + // CAUTION: despite the "warning: @NonNull is meaningless on a primitive @lombok.RequiredArgsConstructor"... + // if removed, no two-arg ctor is generated, so syntax error on `new CollectionBackedTaskSpan(elems, startIndex)` + @NonNull + private int startingIndex; + private transient Iterator<WORK_ITEM> statefulDelegatee = null; + + @Override + public int getNumElems() { + return elems.size(); + } + + @Override + public boolean hasNext() { + if (statefulDelegatee == null) { + statefulDelegatee = elems.iterator(); + } + return statefulDelegatee.hasNext(); + } + + @Override + public WORK_ITEM next() { + if (statefulDelegatee == null) { + throw new IllegalStateException("first call `hasNext()`!"); Review Comment: Is it regular for collections to enforce that `next()` is only called after `hasNext()`? When I was using this class prior I was tripped up around here as my expectation was that next() will fail only if there are no elements left in the list ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.workflow; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.compress.utils.Lists; + +import io.temporal.api.enums.v1.ParentClosePolicy; +import io.temporal.workflow.Async; +import io.temporal.workflow.ChildWorkflowOptions; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; + +import org.apache.gobblin.temporal.util.nesting.work.WFAddr; +import org.apache.gobblin.temporal.util.nesting.work.Workload; + + +/** Core skeleton of {@link NestingExecWorkflow}: realizing classes need only define {@link #launchAsyncActivity} */ Review Comment: Can we have a bit more documentation on what values are expected for maxBranchesPerTree, maxSubTreesPerTree, etc for the expectation of a more balanced tree? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.workflow; + +import java.util.Optional; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +import org.apache.gobblin.temporal.util.nesting.work.WFAddr; +import org.apache.gobblin.temporal.util.nesting.work.Workload; + + +/** + * Process all `WORK_ITEM`s of {@link Workload}, from `startIndex` to the end by creating child workflows, where this and + * descendants should have at most `maxBranchesPerTree`, with at most `maxSubTreesPerTree` of those being child + * workflows. (Non-child-workflow (terminal) branches are the activity executions.) + * + * The underlying motivation is to create logical workflows of unbounded size, despite Temporal's event history limit + * of 50Ki events; see: https://docs.temporal.io/workflows#event-history + * + * IMPORTANT: `Math.sqrt(maxBranchesPerTree) == maxSubTreesPerTree` provides a good rule-of-thumb; `maxSubTreesPerTree + * must not exceed that. This enables consolidation, wherein continued expansion occurs only along the tree's right-most edges. + * + * @param <WORK_ITEM> the type of task for which to invoke an appropriate activity + * @param maxSubTreesForCurrentTreeOverride when the current tree should use different max sub-trees than descendants + */ +@WorkflowInterface +public interface NestingExecWorkflow<WORK_ITEM> { + @WorkflowMethod + int performWorkload( Review Comment: Can we add a javadoc for what the return value should represent? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
