Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]

2024-06-12 Thread via GitHub


reswqa closed pull request #24725: [FLINK-34977][API] Introduce State Access on 
DataStream API V2
URL: https://github.com/apache/flink/pull/24725


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]

2024-06-12 Thread via GitHub


jeyhunkarimov commented on PR #24725:
URL: https://github.com/apache/flink/pull/24725#issuecomment-2163282560

   Thanks a lot @reswqa 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]

2024-06-12 Thread via GitHub


jeyhunkarimov commented on PR #24725:
URL: https://github.com/apache/flink/pull/24725#issuecomment-2162579459

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]

2024-06-09 Thread via GitHub


jeyhunkarimov commented on PR #24725:
URL: https://github.com/apache/flink/pull/24725#issuecomment-2156736006

   Hi @reswqa thanks a lot for the review. Could you please do another pass in 
your available time? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]

2024-06-07 Thread via GitHub


jeyhunkarimov commented on code in PR #24725:
URL: https://github.com/apache/flink/pull/24725#discussion_r1631634734


##
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java:
##
@@ -49,6 +80,82 @@ public  K getCurrentKey() {
 return (K) currentKeySupplier.get();
 }
 
+@Override
+public  Optional> getState(ValueStateDeclaration 
stateDeclaration)
+throws Exception {
+ValueStateDescriptor valueStateDescriptor =
+new ValueStateDescriptor<>(
+stateDeclaration.getName(),
+stateDeclaration.getTypeInformation().getTypeClass());
+return Optional.of(operatorContext.getState(valueStateDescriptor));
+}
+
+@Override
+public  Optional> getState(ListStateDeclaration 
stateDeclaration)
+throws Exception {
+
+ListStateDescriptor listStateDescriptor =
+new ListStateDescriptor<>(
+stateDeclaration.getName(),
+stateDeclaration.getTypeInformation().getTypeClass());
+
+if (stateDeclaration.getRedistributionMode()
+== StateDeclaration.RedistributionMode.IDENTICAL) {

Review Comment:
   Yes, good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]

2024-06-07 Thread via GitHub


jeyhunkarimov commented on code in PR #24725:
URL: https://github.com/apache/flink/pull/24725#discussion_r1631491625


##
flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java:
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.utils.TypeUtils;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/** Descriptor interface to create TypeInformation instances. */
+@Experimental
+public class TypeDescriptors implements Serializable {
+
+@SuppressWarnings("unchecked")
+public static  TypeDescriptor> mapTypeDescriptor(

Review Comment:
   Makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]

2024-06-06 Thread via GitHub


reswqa commented on code in PR #24725:
URL: https://github.com/apache/flink/pull/24725#discussion_r1630562709


##
flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/BasicTypeDescriptorTest.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.common;
+
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link 
org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl}. */
+class BasicTypeDescriptorTest {
+
+@Test
+void testBasicTypeDescriptorString() {
+assertThat(TypeDescriptors.STRING_TYPE.toString())
+.isEqualTo("BasicTypeDescriptorImpl [basicTypeInfo=String]");
+}
+
+@Test
+void testBasicTypeDescriptorInt() {
+assertThat(TypeDescriptors.INT_TYPE.toString())
+.isEqualTo("BasicTypeDescriptorImpl [basicTypeInfo=Integer]");
+}

Review Comment:
   Could we simplify these tests? There is only little difference between them. 
I think we can reduce them into one test case by looping or parameterizing them.
   
   Same goes for the other `XXXTypeDescriptorTest`.



##
flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java:
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.utils.TypeUtils;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/** Descriptor interface to create TypeInformation instances. */
+@Experimental
+public class TypeDescriptors implements Serializable {
+
+@SuppressWarnings("unchecked")
+public static  TypeDescriptor> mapTypeDescriptor(
+TypeDescriptor keyTypeDescriptor, TypeDescriptor 
valueTypeDescriptor)
+throws ReflectiveOperationException {
+
+return (TypeDescriptor>)
+TypeUtils.getInstance(
+
"org.apache.flink.api.common.typeinfo.descriptor.MapTypeDescriptorImpl",
+keyTypeDescriptor,
+valueTypeDescriptor);
+}
+
+@SuppressWarnings("unchecked")
+public static  TypeDescriptor> 
listTypeDescriptor(TypeDescriptor typeDescriptor)

Review Comment:
   ```suggestion
   public static  TypeDescriptor> 
listTypeDescriptor(TypeDescriptor elementTypeDescriptor)
   ```



##
flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java:
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or