xintongsong commented on code in PR #24541: URL: https://github.com/apache/flink/pull/24541#discussion_r1560578842
########## flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java: ########## Review Comment: Resource, CPUResource, ExternalResource, Preconditions are all @Internal. They should not be moved to core-api. Instead, we can split `SlotSharingGroup` into a pair of interface and implementation. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/ProcessingTimeManager.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.context; + +import org.apache.flink.annotation.Experimental; + +/** + * This is responsibility for managing runtime information related to processing time of process + * function. + */ +@Experimental +public interface ProcessingTimeManager { + /** + * Register a processing timer for this process function. `onProcessingTimer` method of this + * function will be invoked as callback if the timer expires. + * + * @param timestamp to trigger timer callback. + */ + void registerProcessingTimer(long timestamp); Review Comment: It's already a `ProcessingTimeManager`. I think we can simply name these methods as `registerTimer` / `deleteTimer` / `currentTime`. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java: ########## @@ -58,7 +60,9 @@ public void open() throws Exception { operatorContext, taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), - taskInfo.getTaskName()); + taskInfo.getTaskName(), + this::currentKey, Review Comment: Why not just passs in `Optional::empty` here? Comments can be moved as well. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.context; + +import org.apache.flink.datastream.api.context.StateManager; + +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * The default implementation of {@link StateManager}. This class supports eagerly set and reset the + * current key. The following rules must be observed: + * + * <p>1. The current key must be reset if setCurrentKey is called. + * + * <p>2. If setCurrentKey is called and not reset, getCurrentKey should always return this key. + */ +public class DefaultStateManager implements StateManager { + /** This is used to store the original key when we overwrite the current key. */ + private Optional<Object> oldKey = Optional.empty(); + + /** + * Retrieve the current key. When {@link #currentKeySetter} receives a key, this must return + * that key until it is reset. + */ + private final Supplier<Optional<Object>> currentKeySupplier; + + private final Consumer<Object> currentKeySetter; + + public DefaultStateManager( + Supplier<Optional<Object>> currentKeySupplier, Consumer<Object> currentKeySetter) { + this.currentKeySupplier = currentKeySupplier; + this.currentKeySetter = currentKeySetter; + } + + @Override + @SuppressWarnings("unchecked") + public <K> Optional<K> getCurrentKey() { + try { + return (Optional<K>) currentKeySupplier.get(); + } catch (Exception e) { + return Optional.empty(); + } + } + + public void setCurrentKey(Object key) { + oldKey = currentKeySupplier.get(); + currentKeySetter.accept(key); + } + + public void resetCurrentKey() { + oldKey.ifPresent(currentKeySetter); Review Comment: Not sure about storing `oldKey` as a field of this class. The correctness highly relies on that `resetCurrentKey` will not be called after a `setCurrentKey`. Instead, we may provide a static util function `executeInKeyContext` of something like that, which store the original key, set the new key, execute the runnable and reset the original key. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/StateManager.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.context; + +import org.apache.flink.annotation.Experimental; + +import java.util.Optional; + +/** This is responsibility for managing runtime information related to state of process function. */ +@Experimental +public interface StateManager { + /** + * Get the key of current record. + * + * @return The key of current processed record. {@link Optional#empty()} if the key can not be + * extracted for this function. Review Comment: Does this mean `Optional.empty()` will be returned on e.g., a non partitioned stream? ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableDataStream.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.stream; + +import org.apache.flink.api.common.operators.SlotSharingGroup; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.datastream.api.stream.ProcessConfigurable; +import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl; +import org.apache.flink.streaming.api.datastream.DataStream; + +/** A {@link DataStream} implementation which processing configurable. */ +@SuppressWarnings("unchecked") +public class ProcessConfigurableDataStream<T, S extends ProcessConfigurable<S>> Review Comment: This is confusing. How can streams be categorized by process-configurable or not? I understand that only upstream transformation of global stream cannot be configured. However, that shouldn't tie to the stream classes. Instead, we may provide some utils to wrap streams with the configure handle into one data structure. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/TimestampManager.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.context; + +import org.apache.flink.annotation.Experimental; + +import java.util.Optional; + +/** This is responsibility for retrieving timestamp related things of process function. */ +@Experimental +public interface TimestampManager { + /** + * Get the timestamp of current processing record. + * + * @return the timestamp of current processed record. If it does not have timestamp, empty will + * be returned. + */ + Optional<Long> getCurrentRecordTimestamp(); Review Comment: What is this for? Shouldn't this be part of the event time support? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java: ########## @@ -41,6 +42,9 @@ public interface RuntimeContext { /** Get the {@link ProcessingTimeManager} of this process function. */ ProcessingTimeManager getProcessingTimeManager(); + /** Get the metric group of this process function. */ + OperatorMetricGroup getMetricGroup(); Review Comment: Why would we expose the entire OperatorMetricGroup? It might be good enough to expose a MetricGroup that allow users to define custom metrics in a process function. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java: ########## @@ -47,4 +48,10 @@ public JobInfo getJobInfo() { public TaskInfo getTaskInfo() { return context.getTaskInfo(); } + + @Override + public StateManager getStateManager() { + // state is partition-aware, so it's always empty in non-partitioned context. + return EmptyStateManager.INSTANCE; Review Comment: Maybe we should have another `PartitionedContext` interface, so that `getStateManager` is only declared for them. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultKeyedTwoOutputNonPartitionedContext.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.context; + +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; +import org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction; + +import java.util.Iterator; + +/** + * {@link TwoOutputNonPartitionedContext} for keyed operator. This will take care of the key context + * when apply to all keyed partitions. + */ +public class DefaultKeyedTwoOutputNonPartitionedContext<OUT1, OUT2> + extends DefaultTwoOutputNonPartitionedContext<OUT1, OUT2> { Review Comment: Not sure about further extending the class. Maybe we can simply use an argument to distinguish whether the stream is keyed or not. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java: ########## @@ -47,4 +48,10 @@ public JobInfo getJobInfo() { public TaskInfo getTaskInfo() { return context.getTaskInfo(); } + + @Override + public StateManager getStateManager() { + // state is partition-aware, so it's always empty in non-partitioned context. + return EmptyStateManager.INSTANCE; Review Comment: Same for `getProcessingTimeManager` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AllKeysContext.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.context; + +import java.util.Iterator; + +/** + * This class maintains the context of all input keys. It will be notified once a key is selected. + */ +public interface AllKeysContext { Review Comment: 1. Why making this an interface? 2. Wouldn't it be good enough to add a HashSet in ProcessOperator and override setKeyContext? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org