[jira] [Created] (FLINK-31186) Removing topic from kafka source does nothing
Exidex created FLINK-31186: -- Summary: Removing topic from kafka source does nothing Key: FLINK-31186 URL: https://issues.apache.org/jira/browse/FLINK-31186 Project: Flink Issue Type: Bug Affects Versions: 1.15.3 Reporter: Exidex As far as I can tell, there is no good way to remove topic from the list of topic that kafka source consumes from. We use {{StreamExecutionEnvironment.fromSource}} api with {{KafkaSource.setTopics}} which accepts list of topics. but when we remove the topic from list after some time the flink kafka source still consumes from it. My guess is that it relates to this TODO in code: [GitHub|https://github.com/apache/flink/blob/cc66d4855e6f8ee9986809a18f68a458bcfe3c12/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305] You can kind of workaroud this by removing whole job state or changing uid of kafka source but that affects either whole job or whole source. The other way is to use state processor api but it doesn't expose source operator state, which in turn can be worked around using reflection and copying code from SourceCoordinator. None of those are satisfactory -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29534) @TypeInfo on field requires field type to be valid Pojo
[ https://issues.apache.org/jira/browse/FLINK-29534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Exidex closed FLINK-29534. -- Resolution: Invalid > @TypeInfo on field requires field type to be valid Pojo > > > Key: FLINK-29534 > URL: https://issues.apache.org/jira/browse/FLINK-29534 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.14.0, 1.15.0 >Reporter: Exidex >Priority: Major > > The ability to place @TypeInfo on field was added in > [https://github.com/apache/flink/pull/8344] . But it seams like the fact that > it requires field to be a valid POJO was overlooked. In my case I was trying > to add custom serializer for Jackson's ObjectNode (wrapped in List but not > sure if this is relevant https://issues.apache.org/jira/browse/FLINK-26470) > which is not a valid POJO, and this requirement seams to defeat the whole > purpose of such feature. It also doesn't look like like there's a way to > register {{org.apache.flink.api.common.typeutils.TypeSerializer}} globally on > 3rd-party types > code snippet from TypeExtractor: > {code:java} > Type fieldType = field.getGenericType(); > if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { > LOG.info( > "Class " > + clazz > + " cannot be used as a POJO type because not all fields > are valid POJO fields, " > + "and must be processed as GenericType. {}", > GENERIC_TYPE_DOC_HINT); > return null; > } > try { > final TypeInformation typeInfo; > List fieldTypeHierarchy = new ArrayList<>(typeHierarchy); > TypeInfoFactory factory = getTypeInfoFactory(field); > if (factory != null) {{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29534) @TypeInfo on field requires field type to be valid Pojo
[ https://issues.apache.org/jira/browse/FLINK-29534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17614036#comment-17614036 ] Exidex commented on FLINK-29534: You are right the problem I had was in different place. Here it worked properly, but there was still a log in logs which misled me. > @TypeInfo on field requires field type to be valid Pojo > > > Key: FLINK-29534 > URL: https://issues.apache.org/jira/browse/FLINK-29534 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.14.0, 1.15.0 >Reporter: Exidex >Priority: Major > > The ability to place @TypeInfo on field was added in > [https://github.com/apache/flink/pull/8344] . But it seams like the fact that > it requires field to be a valid POJO was overlooked. In my case I was trying > to add custom serializer for Jackson's ObjectNode (wrapped in List but not > sure if this is relevant https://issues.apache.org/jira/browse/FLINK-26470) > which is not a valid POJO, and this requirement seams to defeat the whole > purpose of such feature. It also doesn't look like like there's a way to > register {{org.apache.flink.api.common.typeutils.TypeSerializer}} globally on > 3rd-party types > code snippet from TypeExtractor: > {code:java} > Type fieldType = field.getGenericType(); > if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { > LOG.info( > "Class " > + clazz > + " cannot be used as a POJO type because not all fields > are valid POJO fields, " > + "and must be processed as GenericType. {}", > GENERIC_TYPE_DOC_HINT); > return null; > } > try { > final TypeInformation typeInfo; > List fieldTypeHierarchy = new ArrayList<>(typeHierarchy); > TypeInfoFactory factory = getTypeInfoFactory(field); > if (factory != null) {{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29534) @TypeInfo on field requires field type to be valid Pojo
[ https://issues.apache.org/jira/browse/FLINK-29534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Exidex updated FLINK-29534: --- Description: The ability to place @TypeInfo on field was added in [https://github.com/apache/flink/pull/8344] . But it seams like the fact that it requires field to be a valid POJO was overlooked. In my case I was trying to add custom serializer for Jackson's ObjectNode (wrapped in List but not sure if this is relevant https://issues.apache.org/jira/browse/FLINK-26470) which is not a valid POJO, and this requirement seams to defeat the whole purpose of such feature. It also doesn't look like like there's a way to register {{org.apache.flink.api.common.typeutils.TypeSerializer}} globally on 3rd-party types code snippet from TypeExtractor: {code:java} Type fieldType = field.getGenericType(); if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { LOG.info( "Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " + "and must be processed as GenericType. {}", GENERIC_TYPE_DOC_HINT); return null; } try { final TypeInformation typeInfo; List fieldTypeHierarchy = new ArrayList<>(typeHierarchy); TypeInfoFactory factory = getTypeInfoFactory(field); if (factory != null) {{code} was: The ability to place @TypeInfo on field was added in [https://github.com/apache/flink/pull/8344] . But it seams like the fact that it requires field to be a valid POJO was overlooked. In my case I was trying to add custom serializer for Jackson's ObjectNode (wrapped in List but not sure if this is relevant https://issues.apache.org/jira/browse/FLINK-26470) which is not a valid POJO, and this requirement seams to defeat the whole purpose of such feature. It also doesn't look like like there's a way to register {{org.apache.flink.api.common.typeutils.TypeSerializer}} globally code snippet from TypeExtractor: {code:java} Type fieldType = field.getGenericType(); if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { LOG.info( "Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " + "and must be processed as GenericType. {}", GENERIC_TYPE_DOC_HINT); return null; } try { final TypeInformation typeInfo; List fieldTypeHierarchy = new ArrayList<>(typeHierarchy); TypeInfoFactory factory = getTypeInfoFactory(field); if (factory != null) {{code} > @TypeInfo on field requires field type to be valid Pojo > > > Key: FLINK-29534 > URL: https://issues.apache.org/jira/browse/FLINK-29534 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.14.0, 1.15.0 >Reporter: Exidex >Priority: Major > > The ability to place @TypeInfo on field was added in > [https://github.com/apache/flink/pull/8344] . But it seams like the fact that > it requires field to be a valid POJO was overlooked. In my case I was trying > to add custom serializer for Jackson's ObjectNode (wrapped in List but not > sure if this is relevant https://issues.apache.org/jira/browse/FLINK-26470) > which is not a valid POJO, and this requirement seams to defeat the whole > purpose of such feature. It also doesn't look like like there's a way to > register {{org.apache.flink.api.common.typeutils.TypeSerializer}} globally on > 3rd-party types > code snippet from TypeExtractor: > {code:java} > Type fieldType = field.getGenericType(); > if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { > LOG.info( > "Class " > + clazz > + " cannot be used as a POJO type because not all fields > are valid POJO fields, " > + "and must be processed as GenericType. {}", > GENERIC_TYPE_DOC_HINT); > return null; > } > try { > final TypeInformation typeInfo; > List fieldTypeHierarchy = new ArrayList<>(typeHierarchy); > TypeInfoFactory factory = getTypeInfoFactory(field); > if (factory != null) {{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29534) @TypeInfo on field requires field type to be valid Pojo
[ https://issues.apache.org/jira/browse/FLINK-29534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Exidex updated FLINK-29534: --- Description: The ability to place @TypeInfo on field was added in [https://github.com/apache/flink/pull/8344] . But it seams like the fact that it requires field to be a valid POJO was overlooked. In my case I was trying to add custom serializer for Jackson's ObjectNode (wrapped in List but not sure if this is relevant https://issues.apache.org/jira/browse/FLINK-26470) which is not a valid POJO, and this requirement seams to defeat the whole purpose of such feature. It also doesn't look like like there's a way to register {{org.apache.flink.api.common.typeutils.TypeSerializer}} globally code snippet from TypeExtractor: {code:java} Type fieldType = field.getGenericType(); if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { LOG.info( "Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " + "and must be processed as GenericType. {}", GENERIC_TYPE_DOC_HINT); return null; } try { final TypeInformation typeInfo; List fieldTypeHierarchy = new ArrayList<>(typeHierarchy); TypeInfoFactory factory = getTypeInfoFactory(field); if (factory != null) {{code} was: The ability to place @TypeInfo on field was added in [https://github.com/apache/flink/pull/8344] . But it seams like the fact that it requires field to be a valid POJO was overlooked. In my case I was trying to add custom serializer for Jackson's ObjectNode (wrapped in List but not sure if this is relevant https://issues.apache.org/jira/browse/FLINK-26470) which is not a valid POJO, and this requirement seams to defeat the whole purpose of such feature. code snippet from TypeExtractor: {code:java} Type fieldType = field.getGenericType(); if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { LOG.info( "Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " + "and must be processed as GenericType. {}", GENERIC_TYPE_DOC_HINT); return null; } try { final TypeInformation typeInfo; List fieldTypeHierarchy = new ArrayList<>(typeHierarchy); TypeInfoFactory factory = getTypeInfoFactory(field); if (factory != null) {{code} > @TypeInfo on field requires field type to be valid Pojo > > > Key: FLINK-29534 > URL: https://issues.apache.org/jira/browse/FLINK-29534 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.14.0, 1.15.0 >Reporter: Exidex >Priority: Major > > The ability to place @TypeInfo on field was added in > [https://github.com/apache/flink/pull/8344] . But it seams like the fact that > it requires field to be a valid POJO was overlooked. In my case I was trying > to add custom serializer for Jackson's ObjectNode (wrapped in List but not > sure if this is relevant https://issues.apache.org/jira/browse/FLINK-26470) > which is not a valid POJO, and this requirement seams to defeat the whole > purpose of such feature. It also doesn't look like like there's a way to > register {{org.apache.flink.api.common.typeutils.TypeSerializer}} globally > code snippet from TypeExtractor: > {code:java} > Type fieldType = field.getGenericType(); > if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { > LOG.info( > "Class " > + clazz > + " cannot be used as a POJO type because not all fields > are valid POJO fields, " > + "and must be processed as GenericType. {}", > GENERIC_TYPE_DOC_HINT); > return null; > } > try { > final TypeInformation typeInfo; > List fieldTypeHierarchy = new ArrayList<>(typeHierarchy); > TypeInfoFactory factory = getTypeInfoFactory(field); > if (factory != null) {{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29534) @TypeInfo on field requires field type to be valid Pojo
Exidex created FLINK-29534: -- Summary: @TypeInfo on field requires field type to be valid Pojo Key: FLINK-29534 URL: https://issues.apache.org/jira/browse/FLINK-29534 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.15.0, 1.14.0 Reporter: Exidex The ability to place @TypeInfo on field was added in [https://github.com/apache/flink/pull/8344] . But it seams like the fact that it requires field to be a valid POJO was overlooked. In my case I was trying to add custom serializer for Jackson's ObjectNode (wrapped in List but not sure if this is relevant https://issues.apache.org/jira/browse/FLINK-26470) which is not a valid POJO, and this requirement seams to defeat the whole purpose of such feature. code snippet from TypeExtractor: {code:java} Type fieldType = field.getGenericType(); if (!isValidPojoField(field, clazz, typeHierarchy) && clazz != Row.class) { LOG.info( "Class " + clazz + " cannot be used as a POJO type because not all fields are valid POJO fields, " + "and must be processed as GenericType. {}", GENERIC_TYPE_DOC_HINT); return null; } try { final TypeInformation typeInfo; List fieldTypeHierarchy = new ArrayList<>(typeHierarchy); TypeInfoFactory factory = getTypeInfoFactory(field); if (factory != null) {{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)