[jira] [Commented] (FLINK-29108) Kubernetes operator: Support queryable state
[ https://issues.apache.org/jira/browse/FLINK-29108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17596924#comment-17596924 ] Ron Crocker commented on FLINK-29108: - [~gyfora] I can accept starting with "extend operator via plugin mechanism". > Kubernetes operator: Support queryable state > > > Key: FLINK-29108 > URL: https://issues.apache.org/jira/browse/FLINK-29108 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Ron Crocker >Priority: Minor > > Enable the kubernetes operator to deploy jobs where queryable state is > desired. > When queryable state is desired, the operator should configure the deployed > job with > # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it. > # Configure the Queryable State proxy and Queryable State server (via the > {{queryable-state.proxy}} and {{queryable-state.server}} configuration > sections respectively). If these sections aren't provided, then the default > configuration is used. > The operator will need to create a Kubernetes service fronting the Task > Managers {{QueryableStateClientProxy}} port (as configured by the above). > Tearing down the job also tears down the service. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29108) Kubernetes operator: Support queryable state
[ https://issues.apache.org/jira/browse/FLINK-29108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17585531#comment-17585531 ] Ron Crocker commented on FLINK-29108: - [~masteryhx] In a word, yes. There's two things in play: # While its "reaching end of life," Queryable State remains a supported capability in all of the Flink versions that the operator currently supports. My company would like to use this Kubernetes operator to manage our Flink jobs, and some of those jobs require Queryable State. We can't use this operator for those jobs until it supports Queryable State for some of those jobs. # I'm trying to rescue Queryable State from deprecation. In [my recent presentation at Flink Forward|[http://example.com|https://www.slideshare.net/FlinkForward/using-queryable-state-for-fun-and-profit]] I made what I'd claim is a fairly strong argument for keeping queryable state in the Flink feature set. ({_}TLDR: Using Flink Queryable State, I can save >90% of the cost of the equivalent Redis-based solution{_}) I'm looking for allies in the fight to keep Queryable State alive. > Kubernetes operator: Support queryable state > > > Key: FLINK-29108 > URL: https://issues.apache.org/jira/browse/FLINK-29108 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Ron Crocker >Priority: Minor > > Enable the kubernetes operator to deploy jobs where queryable state is > desired. > When queryable state is desired, the operator should configure the deployed > job with > # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it. > # Configure the Queryable State proxy and Queryable State server (via the > {{queryable-state.proxy}} and {{queryable-state.server}} configuration > sections respectively). If these sections aren't provided, then the default > configuration is used. > The operator will need to create a Kubernetes service fronting the Task > Managers {{QueryableStateClientProxy}} port (as configured by the above). > Tearing down the job also tears down the service. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29108) Kubernetes operator: Support queryable state
Ron Crocker created FLINK-29108: --- Summary: Kubernetes operator: Support queryable state Key: FLINK-29108 URL: https://issues.apache.org/jira/browse/FLINK-29108 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Ron Crocker Enable the kubernetes operator to deploy jobs where queryable state is desired. When queryable state is desired, the operator should configure the deployed job with # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it. # Configure the Queryable State proxy and Queryable State server (via the {{queryable-state.proxy}} and {{queryable-state.server}} configuration sections respectively). If these sections aren't provided, then the default configuration is used. The operator will need to create a Kubernetes service fronting the Task Managers {{QueryableStateClientProxy}} port (as configured by the above). Tearing down the job also tears down the service. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-8127) Add New Relic Metric Reporter
Ron Crocker created FLINK-8127: -- Summary: Add New Relic Metric Reporter Key: FLINK-8127 URL: https://issues.apache.org/jira/browse/FLINK-8127 Project: Flink Issue Type: Improvement Components: Metrics Reporter: Ron Crocker Add a MetricReporter that reports to New Relic. This will likely look similar to the Datadog metric reporter - an opt-in library distributed with Flink that communicates directly with New Relic like one of its APM agents, configured appropriately to work with New Relic. I'll take this ticket myself -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8074) Launch Flink jobs using Maven coordinates
Ron Crocker created FLINK-8074: -- Summary: Launch Flink jobs using Maven coordinates Key: FLINK-8074 URL: https://issues.apache.org/jira/browse/FLINK-8074 Project: Flink Issue Type: Improvement Reporter: Ron Crocker Priority: Minor As a Flink user, I want to be able to submit my job using the Maven coordinates (see https://maven.apache.org/pom.html#Maven_Coordinates) of its jar instead of a path to a local copy of that jar. For example, instead of submitting my job using: {{bin/flink run /local/path/to/word-count-1.0.1.jar }} I would specify it's Maven coordinates: {{bin/flink run com.newrelic:word-count:1.0.1 }} This latter form would contact known Maven repositories to acquire the jar at the specified coordinates and submit that to the cluster. Considerations: * No transitive dependencies should be included - the target, specified either as a jar file in the local file system or by its maven coordinates, should be a complete Flink job. * Maven repositories need to be specified _somewhere_. It's reasonable to expect that these repositories are independent of the cluster configurations. * Specified repositories must meet the Maven API, but don't need to be Maven - artifactory, for example, is a valid repository as long as it meets the Maven API. * _minor point_: _Indeterminate versions_ should be prohibited - that is, consider {{com.newrelic:word-count:+}} an invalid coordinate specification. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes
[ https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226504#comment-15226504 ] Ron Crocker commented on FLINK-3697: [~rmetzger] This code is fine to use for your test case. The key aspect to include is a key field is lexicographically "greater than" a field that holds a nested POJO. This is what causes the index of the key field (in the flattened representation of the POJO) to be beyond the natural (unflattened) fields of the outer POJO. I would have provided a shorter example but I didn't have the time to do so. > keyBy() with nested POJO computes invalid field position indexes > > > Key: FLINK-3697 > URL: https://issues.apache.org/jira/browse/FLINK-3697 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.0 > Environment: MacOS X 10.10 >Reporter: Ron Crocker >Priority: Critical > Labels: pojo > > Using named keys in keyBy() for nested POJO types results in failure. The > iindexes for named key fields are used inconsistently with nested POJO types. > In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position > after (apparently) flattening the structure but is referenced in the > unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. > In the example below, getFlatFields() returns positions 0, 1, and 14. These > positions appear correct in the flattened structure of the Data class. > However, in {{KeySelectorgetSelectorForKeys(Keys keys, > TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to > {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results > {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the > length of the directly named fields of the object vs the length of flattened > version of that type. > Concrete Example: > Consider this graph: > {code} > DataStream dataStream = see.addSource(new > FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), > kafkaConsumerProperties)); > dataStream > .flatMap(new DataMapper()) > .keyBy("aaa", "abc", "wxyz") > {code} > {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes > this NativeDataFormat object and extracts individual Data objects: {code} > public class Data { > public int aaa; > public int abc; > public long wxyz; > public int t1; > public int t2; > public Policy policy; > public Stats stats; > public Data() {} > {code} > A {{Policy}} object is an instance of this class: > {code} > public class Policy { > public short a; > public short b; > public boolean c; > public boolean d; > public Policy() {} > } > {code} > A {{Stats}} object is an instance of this class: > {code} > public class Stats { > public long count; > public float a; > public float b; > public float c; > public float d; > public float e; > public Stats() {} > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes
[ https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Crocker updated FLINK-3697: --- Description: Using named keys in keyBy() for nested POJO types results in failure. The iindexes for named key fields are used inconsistently with nested POJO types. In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position after (apparently) flattening the structure but is referenced in the unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. In the example below, getFlatFields() returns positions 0, 1, and 14. These positions appear correct in the flattened structure of the Data class. However, in {{KeySelectorgetSelectorForKeys(Keys keys, TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the length of the directly named fields of the object vs the length of flattened version of that type. Concrete Example: Consider this graph: {code} DataStream dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), kafkaConsumerProperties)); dataStream .flatMap(new DataMapper()) .keyBy("aaa", "abc", "wxyz") {code} {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes this NativeDataFormat object and extracts individual Data objects: {code} public class Data { public int aaa; public int abc; public long wxyz; public int t1; public int t2; public Policy policy; public Stats stats; public Data() {} {code} A {{Policy}} object is an instance of this class: {code} public class Policy { public short a; public short b; public boolean c; public boolean d; public Policy() {} } {code} A {{Stats}} object is an instance of this class: {code} public class Stats { public long count; public float a; public float b; public float c; public float d; public float e; public Stats() {} } {code} was: Using named keys in keyBy() for nested POJO types results in failure. The iindexes for named key fields are used inconsistently with nested POJO types. In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position after (apparently) flattening the structure but is referenced in the unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. In the example below, getFlatFields() returns positions 0, 1, and 14. These positions appear correct in the flattened structure of the Data class. However, in {{KeySelector getSelectorForKeys(Keys keys, TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the length of the directly named fields of the object vs the length of flattened version of that type. Concrete Example: Consider this graph: {code} DataStream dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), kafkaConsumerProperties)); dataStream .flatMap(new DataMapper()) .keyBy("aaa", "abc", "wxyz") {code} {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes this NativeDataFormat object and extracts individual Data objects: {code} public class Data { public int aaa; public int abc; public long wxyz; public int t1; public int t2; public Policy policy; public Stats stats; public Data() {} {code} A {{Policy}} object is an instance of this class: {code} public class AggregatableMetricStoragePolicy implements MetricStoragePolicy { public short a; public short b; public boolean c; public boolean d; public Policy() {} } {code} A {{Stats}} object is an instance of this class: {code} public class Stats { public long count; public float a; public float b; public float c; public float d; public float e; public Stats() {} } {code} > keyBy() with nested POJO computes invalid field position indexes > > > Key: FLINK-3697 > URL: https://issues.apache.org/jira/browse/FLINK-3697 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.0 > Environment: MacOS X 10.10 >Reporter: Ron Crocker >Priority: Minor > Labels: pojo > > Using named keys in keyBy() for nested POJO types results in failure. The > iindexes for named key fields are used inconsistently with nested POJO types. > In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position > after (apparently) flattening
[jira] [Created] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes
Ron Crocker created FLINK-3697: -- Summary: keyBy() with nested POJO computes invalid field position indexes Key: FLINK-3697 URL: https://issues.apache.org/jira/browse/FLINK-3697 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.0.0 Environment: MacOS X 10.10 Reporter: Ron Crocker Priority: Minor Using named keys in keyBy() for nested POJO types results in failure. The iindexes for named key fields are used inconsistently with nested POJO types. In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position after (apparently) flattening the structure but is referenced in the unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}. In the example below, getFlatFields() returns positions 0, 1, and 14. These positions appear correct in the flattened structure of the Data class. However, in {{KeySelectorgetSelectorForKeys(Keys keys, TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the length of the directly named fields of the object vs the length of flattened version of that type. Concrete Example: Consider this graph: {code} DataStream dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), kafkaConsumerProperties)); dataStream .flatMap(new DataMapper()) .keyBy("aaa", "abc", "wxyz") {code} {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes this NativeDataFormat object and extracts individual Data objects: {code} public class Data { public int aaa; public int abc; public long wxyz; public int t1; public int t2; public Policy policy; public Stats stats; public Data() {} {code} A {{Policy}} object is an instance of this class: {code} public class AggregatableMetricStoragePolicy implements MetricStoragePolicy { public short a; public short b; public boolean c; public boolean d; public Policy() {} } {code} A {{Stats}} object is an instance of this class: {code} public class Stats { public long count; public float a; public float b; public float c; public float d; public float e; public Stats() {} } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)