Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]
reswqa closed pull request #24725: [FLINK-34977][API] Introduce State Access on DataStream API V2 URL: https://github.com/apache/flink/pull/24725 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]
jeyhunkarimov commented on PR #24725: URL: https://github.com/apache/flink/pull/24725#issuecomment-2163282560 Thanks a lot @reswqa -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]
jeyhunkarimov commented on PR #24725: URL: https://github.com/apache/flink/pull/24725#issuecomment-2162579459 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]
jeyhunkarimov commented on PR #24725: URL: https://github.com/apache/flink/pull/24725#issuecomment-2156736006 Hi @reswqa thanks a lot for the review. Could you please do another pass in your available time? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]
jeyhunkarimov commented on code in PR #24725: URL: https://github.com/apache/flink/pull/24725#discussion_r1631634734 ## flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java: ## @@ -49,6 +80,82 @@ public K getCurrentKey() { return (K) currentKeySupplier.get(); } +@Override +public Optional> getState(ValueStateDeclaration stateDeclaration) +throws Exception { +ValueStateDescriptor valueStateDescriptor = +new ValueStateDescriptor<>( +stateDeclaration.getName(), +stateDeclaration.getTypeInformation().getTypeClass()); +return Optional.of(operatorContext.getState(valueStateDescriptor)); +} + +@Override +public Optional> getState(ListStateDeclaration stateDeclaration) +throws Exception { + +ListStateDescriptor listStateDescriptor = +new ListStateDescriptor<>( +stateDeclaration.getName(), +stateDeclaration.getTypeInformation().getTypeClass()); + +if (stateDeclaration.getRedistributionMode() +== StateDeclaration.RedistributionMode.IDENTICAL) { Review Comment: Yes, good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]
jeyhunkarimov commented on code in PR #24725: URL: https://github.com/apache/flink/pull/24725#discussion_r1631491625 ## flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java: ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.utils.TypeUtils; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** Descriptor interface to create TypeInformation instances. */ +@Experimental +public class TypeDescriptors implements Serializable { + +@SuppressWarnings("unchecked") +public static TypeDescriptor> mapTypeDescriptor( Review Comment: Makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]
reswqa commented on code in PR #24725: URL: https://github.com/apache/flink/pull/24725#discussion_r1630562709 ## flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/BasicTypeDescriptorTest.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.common; + +import org.apache.flink.api.common.typeinfo.TypeDescriptors; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl}. */ +class BasicTypeDescriptorTest { + +@Test +void testBasicTypeDescriptorString() { +assertThat(TypeDescriptors.STRING_TYPE.toString()) +.isEqualTo("BasicTypeDescriptorImpl [basicTypeInfo=String]"); +} + +@Test +void testBasicTypeDescriptorInt() { +assertThat(TypeDescriptors.INT_TYPE.toString()) +.isEqualTo("BasicTypeDescriptorImpl [basicTypeInfo=Integer]"); +} Review Comment: Could we simplify these tests? There is only little difference between them. I think we can reduce them into one test case by looping or parameterizing them. Same goes for the other `XXXTypeDescriptorTest`. ## flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java: ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.utils.TypeUtils; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** Descriptor interface to create TypeInformation instances. */ +@Experimental +public class TypeDescriptors implements Serializable { + +@SuppressWarnings("unchecked") +public static TypeDescriptor> mapTypeDescriptor( +TypeDescriptor keyTypeDescriptor, TypeDescriptor valueTypeDescriptor) +throws ReflectiveOperationException { + +return (TypeDescriptor>) +TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.MapTypeDescriptorImpl", +keyTypeDescriptor, +valueTypeDescriptor); +} + +@SuppressWarnings("unchecked") +public static TypeDescriptor> listTypeDescriptor(TypeDescriptor typeDescriptor) Review Comment: ```suggestion public static TypeDescriptor> listTypeDescriptor(TypeDescriptor elementTypeDescriptor) ``` ## flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java: ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or