Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-21 Thread via GitHub


reswqa merged PR #24541:
URL: https://github.com/apache/flink/pull/24541


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-21 Thread via GitHub


Sxnan commented on PR #24541:
URL: https://github.com/apache/flink/pull/24541#issuecomment-2123797964

   @reswqa Thanks for the update! LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-21 Thread via GitHub


reswqa commented on PR #24541:
URL: https://github.com/apache/flink/pull/24541#issuecomment-2123714167

   Thanks @Sxnan for the review! I have updated this PR according to your 
comments, PTAL again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-21 Thread via GitHub


Sxnan commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1608053758


##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java:
##
@@ -18,23 +18,61 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.datastream.api.common.Collector;
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.util.Set;
+
 /** The default implementation of {@link NonPartitionedContext}. */
 public class DefaultNonPartitionedContext implements 
NonPartitionedContext {
 private final DefaultRuntimeContext context;
 
-public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
+private final DefaultPartitionedContext partitionedContext;
+
+private final Collector collector;
+
+private final boolean isKeyed;
+
+private final Set keySet;
+
+public DefaultNonPartitionedContext(
+DefaultRuntimeContext context,
+DefaultPartitionedContext partitionedContext,
+Collector collector,
+boolean isKeyed,
+Set keySet) {
 this.context = context;
+this.partitionedContext = partitionedContext;
+this.collector = collector;
+this.isKeyed = isKeyed;
+this.keySet = keySet;
 }
 
 @Override
-public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction) {
-// TODO implements this method.
+public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction)
+throws Exception {
+if (isKeyed) {
+for (Object key : keySet) {

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-21 Thread via GitHub


reswqa commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1608050941


##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java:
##
@@ -18,23 +18,61 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.datastream.api.common.Collector;
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.util.Set;
+
 /** The default implementation of {@link NonPartitionedContext}. */
 public class DefaultNonPartitionedContext implements 
NonPartitionedContext {
 private final DefaultRuntimeContext context;
 
-public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
+private final DefaultPartitionedContext partitionedContext;
+
+private final Collector collector;
+
+private final boolean isKeyed;
+
+private final Set keySet;
+
+public DefaultNonPartitionedContext(
+DefaultRuntimeContext context,
+DefaultPartitionedContext partitionedContext,
+Collector collector,
+boolean isKeyed,
+Set keySet) {
 this.context = context;
+this.partitionedContext = partitionedContext;
+this.collector = collector;
+this.isKeyed = isKeyed;
+this.keySet = keySet;
 }
 
 @Override
-public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction) {
-// TODO implements this method.
+public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction)
+throws Exception {
+if (isKeyed) {
+for (Object key : keySet) {

Review Comment:
   I think a `TODO` is enough, since this part of code is just WIP, and the 
next following PR will handle it right away, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-21 Thread via GitHub


Sxnan commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1608022155


##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java:
##
@@ -18,23 +18,61 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.datastream.api.common.Collector;
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.util.Set;
+
 /** The default implementation of {@link NonPartitionedContext}. */
 public class DefaultNonPartitionedContext implements 
NonPartitionedContext {
 private final DefaultRuntimeContext context;
 
-public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
+private final DefaultPartitionedContext partitionedContext;
+
+private final Collector collector;
+
+private final boolean isKeyed;
+
+private final Set keySet;
+
+public DefaultNonPartitionedContext(
+DefaultRuntimeContext context,
+DefaultPartitionedContext partitionedContext,
+Collector collector,
+boolean isKeyed,
+Set keySet) {
 this.context = context;
+this.partitionedContext = partitionedContext;
+this.collector = collector;
+this.isKeyed = isKeyed;
+this.keySet = keySet;
 }
 
 @Override
-public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction) {
-// TODO implements this method.
+public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction)
+throws Exception {
+if (isKeyed) {
+for (Object key : keySet) {

Review Comment:
   If that is the case, how about we add a TODO and throw an exception in this 
case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-21 Thread via GitHub


reswqa commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1607875012


##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java:
##
@@ -18,23 +18,61 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.datastream.api.common.Collector;
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.util.Set;
+
 /** The default implementation of {@link NonPartitionedContext}. */
 public class DefaultNonPartitionedContext implements 
NonPartitionedContext {
 private final DefaultRuntimeContext context;
 
-public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
+private final DefaultPartitionedContext partitionedContext;
+
+private final Collector collector;
+
+private final boolean isKeyed;
+
+private final Set keySet;
+
+public DefaultNonPartitionedContext(
+DefaultRuntimeContext context,
+DefaultPartitionedContext partitionedContext,
+Collector collector,
+boolean isKeyed,
+Set keySet) {
 this.context = context;
+this.partitionedContext = partitionedContext;
+this.collector = collector;
+this.isKeyed = isKeyed;
+this.keySet = keySet;
 }
 
 @Override
-public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction) {
-// TODO implements this method.
+public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction)
+throws Exception {
+if (isKeyed) {
+for (Object key : keySet) {

Review Comment:
   That's a good question! Theoretically, we should put it in something like 
state, but support for state will have to wait until the next PR, and we'll 
shall support resuming it from cp in the future commit I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-21 Thread via GitHub


Sxnan commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1607724484


##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultJobInfo.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.apache.flink.datastream.api.context.JobInfo;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+/** Default implementation of {@link JobInfo}. */
+public class DefaultJobInfo implements JobInfo {
+private final StreamingRuntimeContext operatorContext;
+
+public DefaultJobInfo(StreamingRuntimeContext streamingRuntimeContext) {
+this.operatorContext = streamingRuntimeContext;

Review Comment:
   I think it is better that we only pass the necessary info to the 
`DefaultJobInfo` like the `DefaultTaskInfo`



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java:
##
@@ -64,7 +68,26 @@ public JobInformation(
 Configuration jobConfiguration,
 Collection requiredJarFileBlobKeys,
 Collection requiredClasspathURLs) {
+this(
+jobId,
+JobType.STREAMING,
+jobName,
+serializedExecutionConfig,
+jobConfiguration,
+requiredJarFileBlobKeys,
+requiredClasspathURLs);
+}
+
+public JobInformation(

Review Comment:
   I think the constructor without jobType is only used in tests. Instead of 
introducing a constructor, how about just updating the existing one?



##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContext.java:
##
@@ -18,23 +18,61 @@
 
 package org.apache.flink.datastream.impl.context;
 
+import org.apache.flink.datastream.api.common.Collector;
 import org.apache.flink.datastream.api.context.JobInfo;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.util.Set;
+
 /** The default implementation of {@link NonPartitionedContext}. */
 public class DefaultNonPartitionedContext implements 
NonPartitionedContext {
 private final DefaultRuntimeContext context;
 
-public DefaultNonPartitionedContext(DefaultRuntimeContext context) {
+private final DefaultPartitionedContext partitionedContext;
+
+private final Collector collector;
+
+private final boolean isKeyed;
+
+private final Set keySet;
+
+public DefaultNonPartitionedContext(
+DefaultRuntimeContext context,
+DefaultPartitionedContext partitionedContext,
+Collector collector,
+boolean isKeyed,
+Set keySet) {
 this.context = context;
+this.partitionedContext = partitionedContext;
+this.collector = collector;
+this.isKeyed = isKeyed;
+this.keySet = keySet;
 }
 
 @Override
-public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction) {
-// TODO implements this method.
+public void applyToAllPartitions(ApplyPartitionFunction 
applyPartitionFunction)
+throws Exception {
+if (isKeyed) {
+for (Object key : keySet) {

Review Comment:
   What happens to the keySet in the case of the operator restoring from a 
checkpoint? I think the method cannot properly work in keyed stream without 
statebackend.



##
flink-core-api/src/main/java/org/apache/flink/api/common/SlotSharingGroup.java:
##
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with 

Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-14 Thread via GitHub


reswqa commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1599450119


##
flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java:
##
@@ -137,6 +137,37 @@ public int hashCode() {
 return result;
 }
 
+/** Convert a {@link SlotSharingGroupDescriptor} to {@link 
SlotSharingGroup}. */
+public static SlotSharingGroup fromDescriptor(SlotSharingGroupDescriptor 
descriptor) {
+if (descriptor.getCpuCores() != null && descriptor.getTaskHeapMemory() 
!= null) {
+MemorySize taskOffHeapMemory =
+descriptor.getTaskOffHeapMemory() == null
+? MemorySize.ZERO
+: descriptor.getTaskOffHeapMemory();
+MemorySize managedMemory =
+descriptor.getManagedMemory() == null
+? MemorySize.ZERO
+: descriptor.getManagedMemory();
+return new SlotSharingGroup(
+descriptor.getName(),
+new CPUResource(descriptor.getCpuCores()),
+descriptor.getTaskHeapMemory(),
+taskOffHeapMemory,
+managedMemory,
+descriptor.getExternalResources());
+} else if (descriptor.getCpuCores() != null
+|| descriptor.getTaskHeapMemory() != null
+|| descriptor.getTaskOffHeapMemory() != null
+|| descriptor.getManagedMemory() != null
+|| !descriptor.getExternalResources().isEmpty()) {
+throw new IllegalArgumentException(
+"The cpu cores and task heap memory are required when 
specifying the resource of a slot sharing group. "
++ "You need to explicitly configure them with 
positive value.");
+} else {
+return new SlotSharingGroup(descriptor.getName());
+}

Review Comment:
   I kind of think that these two are not equivalent transformations 樂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-14 Thread via GitHub


reswqa commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1599437315


##
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroupDescriptor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The descriptor that describe the name and the different resource components 
of a slot sharing
+ * group.
+ */
+@Experimental
+public class SlotSharingGroupDescriptor {
+private final String name;
+
+/** How many cpu cores are needed. Can be null only if it is unknown. */
+@Nullable // can be null only for UNKNOWN
+private final Double cpuCores;
+
+/** How much task heap memory is needed. */
+@Nullable // can be null only for UNKNOWN
+private final MemorySize taskHeapMemory;
+
+/** How much task off-heap memory is needed. */
+@Nullable // can be null only for UNKNOWN
+private final MemorySize taskOffHeapMemory;
+
+/** How much managed memory is needed. */
+@Nullable // can be null only for UNKNOWN
+private final MemorySize managedMemory;
+
+/** A extensible field for user specified resources from {@link 
SlotSharingGroupDescriptor}. */
+private final Map externalResources = new HashMap<>();
+
+private SlotSharingGroupDescriptor(
+String name,
+@Nullable Double cpuCores,
+@Nullable MemorySize taskHeapMemory,
+@Nullable MemorySize taskOffHeapMemory,
+@Nullable MemorySize managedMemory,
+@Nullable Map extendedResources) {
+this.name = name;
+this.cpuCores = cpuCores;
+this.taskHeapMemory = taskHeapMemory;
+this.taskOffHeapMemory = taskOffHeapMemory;
+this.managedMemory = managedMemory;
+this.externalResources.putAll(extendedResources);

Review Comment:
   Yes, I shall remove the`@Nullable` marker from ctr. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-05-08 Thread via GitHub


xintongsong commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1593914163


##
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/NonPartitionedRuntimeContext.java:
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.context;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * A {@link NonPartitionedRuntimeContext} contains all execution information 
unrelated to partition.
+ */
+@Experimental
+public interface NonPartitionedRuntimeContext {}

Review Comment:
   The contexts become confusing.
   
   There are currently 5 contexts, and there relationship and differences are 
unclear.
   - RuntimeContext
   - PartitionedRuntimeContext
   - NonPartitionedRuntimeContext
   - NonPartitionedContext
   - TwoOutputNonPartitionedContext
   
   I think the information / functionalities provided by these contexts can be 
categorized into 3 kinds
   - Only needed in partitioned context (PartitionedRuntimeContext)
   - Only needed in non-partitioned context (NonPartitionedContext & 
TwoOutputNonPartitionedContext)
   - Needed by both partitioned and non partitioned 
(NonPartitionedRuntimeContext)
   
   Therefore, I'd suggest:
   - RuntimeContext for things needed by both partitioned and non-partitioned, 
which is exactly the current NonPartitionedRuntimeContext
   - OneOutputNonPartitionedContext and TwoOutputNonPartitionedContext, both 
extends RuntimeContext, for things only needed in non-partitioned context, 
which are the current NonPartitionedContext & TwoOutputNonPartitionedContext
   - PartitionedContext, extends RuntimeContext, for things only needed in 
partitioned context, which is the current RuntimeContext
   - The current PartitionedRuntimeContext is no longer needed, as is is 
neither directly implemented by any concrete classes, nor extended by multiple 
sub-interfaces/classes.



##
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroupDescriptor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The descriptor that describe the name and the different resource components 
of a slot sharing
+ * group.
+ */
+@Experimental
+public class SlotSharingGroupDescriptor {

Review Comment:
   Not sure about the name `xxxDescriptor`. Maybe we can keep the name but move 
it to another package for v2. Since we are no longer exposing the concept of 
operator, maybe just move it to the new package `o.a.f.api.common`. And please 
make sure we document the purpose of having two SSG classes and their 
differences clearly in JavaDocs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-04-29 Thread via GitHub


jeyhunkarimov commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1583695410


##
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroupDescriptor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The descriptor that describe the name and the different resource components 
of a slot sharing
+ * group.
+ */
+@Experimental
+public class SlotSharingGroupDescriptor {
+private final String name;
+
+/** How many cpu cores are needed. Can be null only if it is unknown. */
+@Nullable // can be null only for UNKNOWN
+private final Double cpuCores;
+
+/** How much task heap memory is needed. */
+@Nullable // can be null only for UNKNOWN
+private final MemorySize taskHeapMemory;
+
+/** How much task off-heap memory is needed. */
+@Nullable // can be null only for UNKNOWN
+private final MemorySize taskOffHeapMemory;
+
+/** How much managed memory is needed. */
+@Nullable // can be null only for UNKNOWN
+private final MemorySize managedMemory;
+
+/** A extensible field for user specified resources from {@link 
SlotSharingGroupDescriptor}. */
+private final Map externalResources = new HashMap<>();
+
+private SlotSharingGroupDescriptor(
+String name,
+@Nullable Double cpuCores,
+@Nullable MemorySize taskHeapMemory,
+@Nullable MemorySize taskOffHeapMemory,
+@Nullable MemorySize managedMemory,
+@Nullable Map extendedResources) {
+this.name = name;
+this.cpuCores = cpuCores;
+this.taskHeapMemory = taskHeapMemory;
+this.taskOffHeapMemory = taskOffHeapMemory;
+this.managedMemory = managedMemory;
+this.externalResources.putAll(extendedResources);
+}
+
+private SlotSharingGroupDescriptor(String name) {
+this.name = name;
+this.cpuCores = null;
+this.taskHeapMemory = null;
+this.taskOffHeapMemory = null;
+this.managedMemory = null;
+}

Review Comment:
   Maybe utilize the the above constructor (e.g., `this(name, null, null, null, 
null)` ?



##
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroupDescriptor.java:
##
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The descriptor that describe the name and the different resource components 
of a slot sharing
+ * group.
+ */
+@Experimental
+public class SlotSharingGroupDescriptor {
+private final String name;
+
+/** How many cpu cores are needed. Can be null only if it is unknown. */
+@Nullable // can be null only for UNKNOWN
+private final Double cpuCores;
+
+/** How 

Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-04-17 Thread via GitHub


reswqa commented on PR #24541:
URL: https://github.com/apache/flink/pull/24541#issuecomment-2062837816

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-04-16 Thread via GitHub


reswqa commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1566991099


##
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/TimestampManager.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.context;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.Optional;
+
+/** This is responsibility for retrieving timestamp related things of process 
function. */
+@Experimental
+public interface TimestampManager {
+/**
+ * Get the timestamp of current processing record.
+ *
+ * @return the timestamp of current processed record. If it does not have 
timestamp, empty will
+ * be returned.
+ */
+Optional getCurrentRecordTimestamp();

Review Comment:
   It sounds more reasonable to introduce this during event-time supporting, I 
will remove it then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-04-16 Thread via GitHub


reswqa commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1566988135


##
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/StateManager.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.context;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.Optional;
+
+/** This is responsibility for managing runtime information related to state 
of process function. */
+@Experimental
+public interface StateManager {
+/**
+ * Get the key of current record.
+ *
+ * @return The key of current processed record. {@link Optional#empty()} 
if the key can not be
+ * extracted for this function.

Review Comment:
   We finally decided in FLIP-433 that we need to throw an exception for 
illegal access to state. I think the same is true for this case, so I've 
removed the `Optional`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-04-16 Thread via GitHub


reswqa commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1566978504


##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/ProcessConfigurableDataStream.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.stream;
+
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.datastream.api.stream.ProcessConfigurable;
+import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/** A {@link DataStream} implementation which processing configurable. */
+@SuppressWarnings("unchecked")
+public class ProcessConfigurableDataStream>

Review Comment:
   You're totally right, binding configuration to streams is something we 
wanted to avoid in the first place, even at the implementation level. I have 
refactored the design here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-04-16 Thread via GitHub


reswqa commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1566976014


##
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/ProcessingTimeManager.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.context;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * This is responsibility for managing runtime information related to 
processing time of process
+ * function.
+ */
+@Experimental
+public interface ProcessingTimeManager {
+/**
+ * Register a processing timer for this process function. 
`onProcessingTimer` method of this
+ * function will be invoked as callback if the timer expires.
+ *
+ * @param timestamp to trigger timer callback.
+ */
+void registerProcessingTimer(long timestamp);

Review Comment:
   Yes, make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-04-11 Thread via GitHub


xintongsong commented on code in PR #24541:
URL: https://github.com/apache/flink/pull/24541#discussion_r1560578842


##
flink-core-api/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java:
##


Review Comment:
   Resource, CPUResource, ExternalResource, Preconditions are all @Internal. 
They should not be moved to core-api. Instead, we can split `SlotSharingGroup` 
into a pair of interface and implementation.



##
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/ProcessingTimeManager.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.context;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * This is responsibility for managing runtime information related to 
processing time of process
+ * function.
+ */
+@Experimental
+public interface ProcessingTimeManager {
+/**
+ * Register a processing timer for this process function. 
`onProcessingTimer` method of this
+ * function will be invoked as callback if the timer expires.
+ *
+ * @param timestamp to trigger timer callback.
+ */
+void registerProcessingTimer(long timestamp);

Review Comment:
   It's already a `ProcessingTimeManager`. I think we can simply name these 
methods as `registerTimer` / `deleteTimer` / `currentTime`.



##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java:
##
@@ -58,7 +60,9 @@ public void open() throws Exception {
 operatorContext,
 taskInfo.getNumberOfParallelSubtasks(),
 taskInfo.getMaxNumberOfParallelSubtasks(),
-taskInfo.getTaskName());
+taskInfo.getTaskName(),
+this::currentKey,

Review Comment:
   Why not just passs in `Optional::empty` here? Comments can be moved as well.



##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.apache.flink.datastream.api.context.StateManager;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The default implementation of {@link StateManager}. This class supports 
eagerly set and reset the
+ * current key. The following rules must be observed:
+ *
+ * 1. The current key must be reset if setCurrentKey is called.
+ *
+ * 2. If setCurrentKey is called and not reset, getCurrentKey should always 
return this key.
+ */
+public class DefaultStateManager implements StateManager {
+/** This is used to store the original key when we overwrite the current 
key. */
+private Optional oldKey = Optional.empty();
+
+/**
+ * Retrieve the current key. When {@link #currentKeySetter} receives a 
key, this must return
+ * that key until it is reset.
+ */
+private final Supplier> currentKeySupplier;
+
+private final Consumer currentKeySetter;
+
+public DefaultStateManager(
+Supplier> currentKeySupplier, Consumer 
currentKeySetter) {
+this.currentKeySupplier = currentKeySupplier;
+this.currentKeySetter = currentKeySetter;
+}
+
+@Override
+@SuppressWarnings("unchecked")
+

Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-04-09 Thread via GitHub


reswqa commented on PR #24541:
URL: https://github.com/apache/flink/pull/24541#issuecomment-2045077406

   I have rebased this on master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-03-20 Thread via GitHub


flinkbot commented on PR #24541:
URL: https://github.com/apache/flink/pull/24541#issuecomment-2009426855

   
   ## CI report:
   
   * 08547b7fa9baa619ec7b62bc893ccc6abcc86bb3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org