xintongsong commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1560578842


##########
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java:
##########


Review Comment:
   Resource, CPUResource, ExternalResource, Preconditions are all @Internal. 
They should not be moved to core-api. Instead, we can split `SlotSharingGroup` 
into a pair of interface and implementation.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/ProcessingTimeManager.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.context;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * This is responsibility for managing runtime information related to 
processing time of process
+ * function.
+ */
+@Experimental
+public interface ProcessingTimeManager {
+    /**
+     * Register a processing timer for this process function. 
`onProcessingTimer` method of this
+     * function will be invoked as callback if the timer expires.
+     *
+     * @param timestamp to trigger timer callback.
+     */
+    void registerProcessingTimer(long timestamp);

Review Comment:
   It's already a `ProcessingTimeManager`. I think we can simply name these 
methods as `registerTimer` / `deleteTimer` / `currentTime`.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java:
##########
@@ -58,7 +60,9 @@ public void open() throws Exception {
                         operatorContext,
                         taskInfo.getNumberOfParallelSubtasks(),
                         taskInfo.getMaxNumberOfParallelSubtasks(),
-                        taskInfo.getTaskName());
+                        taskInfo.getTaskName(),
+                        this::currentKey,

Review Comment:
   Why not just passs in `Optional::empty` here? Comments can be moved as well.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.apache.flink.datastream.api.context.StateManager;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The default implementation of {@link StateManager}. This class supports 
eagerly set and reset the
+ * current key. The following rules must be observed:
+ *
+ * <p>1. The current key must be reset if setCurrentKey is called.
+ *
+ * <p>2. If setCurrentKey is called and not reset, getCurrentKey should always 
return this key.
+ */
+public class DefaultStateManager implements StateManager {
+    /** This is used to store the original key when we overwrite the current 
key. */
+    private Optional<Object> oldKey = Optional.empty();
+
+    /**
+     * Retrieve the current key. When {@link #currentKeySetter} receives a 
key, this must return
+     * that key until it is reset.
+     */
+    private final Supplier<Optional<Object>> currentKeySupplier;
+
+    private final Consumer<Object> currentKeySetter;
+
+    public DefaultStateManager(
+            Supplier<Optional<Object>> currentKeySupplier, Consumer<Object> 
currentKeySetter) {
+        this.currentKeySupplier = currentKeySupplier;
+        this.currentKeySetter = currentKeySetter;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <K> Optional<K> getCurrentKey() {
+        try {
+            return (Optional<K>) currentKeySupplier.get();
+        } catch (Exception e) {
+            return Optional.empty();
+        }
+    }
+
+    public void setCurrentKey(Object key) {
+        oldKey = currentKeySupplier.get();
+        currentKeySetter.accept(key);
+    }
+
+    public void resetCurrentKey() {
+        oldKey.ifPresent(currentKeySetter);

Review Comment:
   Not sure about storing `oldKey` as a field of this class. The correctness 
highly relies on that `resetCurrentKey` will not be called after a 
`setCurrentKey`. Instead, we may provide a static util function 
`executeInKeyContext` of something like that, which store the original key, set 
the new key, execute the runnable and reset the original key.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/StateManager.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.context;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.Optional;
+
+/** This is responsibility for managing runtime information related to state 
of process function. */
+@Experimental
+public interface StateManager {
+    /**
+     * Get the key of current record.
+     *
+     * @return The key of current processed record. {@link Optional#empty()} 
if the key can not be
+     *     extracted for this function.

Review Comment:
   Does this mean `Optional.empty()` will be returned on e.g., a non 
partitioned stream?



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableDataStream.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.stream;
+
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.datastream.api.stream.ProcessConfigurable;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/** A {@link DataStream} implementation which processing configurable. */
+@SuppressWarnings("unchecked")
+public class ProcessConfigurableDataStream<T, S extends ProcessConfigurable<S>>

Review Comment:
   This is confusing. How can streams be categorized by process-configurable or 
not? I understand that only upstream transformation of global stream cannot be 
configured. However, that shouldn't tie to the stream classes. Instead, we may 
provide some utils to wrap streams with the configure handle into one data 
structure.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/TimestampManager.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.context;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.Optional;
+
+/** This is responsibility for retrieving timestamp related things of process 
function. */
+@Experimental
+public interface TimestampManager {
+    /**
+     * Get the timestamp of current processing record.
+     *
+     * @return the timestamp of current processed record. If it does not have 
timestamp, empty will
+     *     be returned.
+     */
+    Optional<Long> getCurrentRecordTimestamp();

Review Comment:
   What is this for? Shouldn't this be part of the event time support?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/RuntimeContext.java:
##########
@@ -41,6 +42,9 @@ public interface RuntimeContext {
     /** Get the {@link ProcessingTimeManager} of this process function. */
     ProcessingTimeManager getProcessingTimeManager();
 
+    /** Get the metric group of this process function. */
+    OperatorMetricGroup getMetricGroup();

Review Comment:
   Why would we expose the entire OperatorMetricGroup? It might be good enough 
to expose a MetricGroup that allow users to define custom metrics in a process 
function.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java:
##########
@@ -47,4 +48,10 @@ public JobInfo getJobInfo() {
     public TaskInfo getTaskInfo() {
         return context.getTaskInfo();
     }
+
+    @Override
+    public StateManager getStateManager() {
+        // state is partition-aware, so it's always empty in non-partitioned 
context.
+        return EmptyStateManager.INSTANCE;

Review Comment:
   Maybe we should have another `PartitionedContext` interface, so that 
`getStateManager` is only declared for them.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultKeyedTwoOutputNonPartitionedContext.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
+import 
org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
+
+import java.util.Iterator;
+
+/**
+ * {@link TwoOutputNonPartitionedContext} for keyed operator. This will take 
care of the key context
+ * when apply to all keyed partitions.
+ */
+public class DefaultKeyedTwoOutputNonPartitionedContext<OUT1, OUT2>
+        extends DefaultTwoOutputNonPartitionedContext<OUT1, OUT2> {

Review Comment:
   Not sure about further extending the class. Maybe we can simply use an 
argument to distinguish whether the stream is keyed or not. 



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContext.java:
##########
@@ -47,4 +48,10 @@ public JobInfo getJobInfo() {
     public TaskInfo getTaskInfo() {
         return context.getTaskInfo();
     }
+
+    @Override
+    public StateManager getStateManager() {
+        // state is partition-aware, so it's always empty in non-partitioned 
context.
+        return EmptyStateManager.INSTANCE;

Review Comment:
   Same for `getProcessingTimeManager`



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/AllKeysContext.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import java.util.Iterator;
+
+/**
+ * This class maintains the context of all input keys. It will be notified 
once a key is selected.
+ */
+public interface AllKeysContext {

Review Comment:
   1. Why making this an interface?
   2. Wouldn't it be good enough to add a HashSet in ProcessOperator and 
override setKeyContext?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to