[jira] [Commented] (FLINK-29108) Kubernetes operator: Support queryable state

2022-08-28 Thread Ron Crocker (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17596924#comment-17596924
 ] 

Ron Crocker commented on FLINK-29108:
-

[~gyfora] I can accept starting with "extend operator via plugin mechanism". 

> Kubernetes operator: Support queryable state
> 
>
> Key: FLINK-29108
> URL: https://issues.apache.org/jira/browse/FLINK-29108
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Ron Crocker
>Priority: Minor
>
> Enable the kubernetes operator to deploy jobs where queryable state is 
> desired.
> When queryable state is desired, the operator should configure the deployed 
> job with
>  # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it.
>  # Configure the Queryable State proxy and Queryable State server (via the 
> {{queryable-state.proxy}} and {{queryable-state.server}} configuration 
> sections respectively). If these sections aren't provided, then the default 
> configuration is used.
> The operator will need to create a Kubernetes service fronting the Task 
> Managers {{QueryableStateClientProxy}} port (as configured by the above).
> Tearing down the job also tears down the service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29108) Kubernetes operator: Support queryable state

2022-08-26 Thread Ron Crocker (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17585531#comment-17585531
 ] 

Ron Crocker commented on FLINK-29108:
-

[~masteryhx] In a word, yes.

There's two things in play:
 # While its "reaching end of life," Queryable State remains a supported 
capability in all of the Flink versions that the operator currently supports. 
My company would like to use this Kubernetes operator to manage our Flink jobs, 
and some of those jobs require Queryable State. We can't use this operator for 
those jobs until it supports Queryable State for some of those jobs. 
 # I'm trying to rescue Queryable State from deprecation. In [my recent 
presentation at Flink 
Forward|[http://example.com|https://www.slideshare.net/FlinkForward/using-queryable-state-for-fun-and-profit]]
 I made what I'd claim is a fairly strong argument for keeping queryable state 
in the Flink feature set. ({_}TLDR: Using Flink Queryable State, I can save 
>90% of the cost of the equivalent Redis-based solution{_})

I'm looking for allies in the fight to keep Queryable State alive.

> Kubernetes operator: Support queryable state
> 
>
> Key: FLINK-29108
> URL: https://issues.apache.org/jira/browse/FLINK-29108
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Ron Crocker
>Priority: Minor
>
> Enable the kubernetes operator to deploy jobs where queryable state is 
> desired.
> When queryable state is desired, the operator should configure the deployed 
> job with
>  # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it.
>  # Configure the Queryable State proxy and Queryable State server (via the 
> {{queryable-state.proxy}} and {{queryable-state.server}} configuration 
> sections respectively). If these sections aren't provided, then the default 
> configuration is used.
> The operator will need to create a Kubernetes service fronting the Task 
> Managers {{QueryableStateClientProxy}} port (as configured by the above).
> Tearing down the job also tears down the service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29108) Kubernetes operator: Support queryable state

2022-08-25 Thread Ron Crocker (Jira)
Ron Crocker created FLINK-29108:
---

 Summary: Kubernetes operator: Support queryable state
 Key: FLINK-29108
 URL: https://issues.apache.org/jira/browse/FLINK-29108
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Ron Crocker


Enable the kubernetes operator to deploy jobs where queryable state is desired.

When queryable state is desired, the operator should configure the deployed job 
with
 # The deployed job has {{queryable-state.enabled:}} {{true}} applied to it.
 # Configure the Queryable State proxy and Queryable State server (via the 
{{queryable-state.proxy}} and {{queryable-state.server}} configuration sections 
respectively). If these sections aren't provided, then the default 
configuration is used.

The operator will need to create a Kubernetes service fronting the Task 
Managers {{QueryableStateClientProxy}} port (as configured by the above).

Tearing down the job also tears down the service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-8127) Add New Relic Metric Reporter

2017-11-21 Thread Ron Crocker (JIRA)
Ron Crocker created FLINK-8127:
--

 Summary: Add New Relic Metric Reporter
 Key: FLINK-8127
 URL: https://issues.apache.org/jira/browse/FLINK-8127
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Reporter: Ron Crocker


Add a MetricReporter that reports to New Relic.

This will likely look similar to the Datadog metric reporter - an opt-in 
library distributed with Flink that communicates directly with New Relic like 
one of its APM agents, configured appropriately to work with New Relic. 

I'll take this ticket myself



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8074) Launch Flink jobs using Maven coordinates

2017-11-14 Thread Ron Crocker (JIRA)
Ron Crocker created FLINK-8074:
--

 Summary: Launch Flink jobs using Maven coordinates
 Key: FLINK-8074
 URL: https://issues.apache.org/jira/browse/FLINK-8074
 Project: Flink
  Issue Type: Improvement
Reporter: Ron Crocker
Priority: Minor


As a Flink user, I want to be able to submit my job using the Maven coordinates 
(see https://maven.apache.org/pom.html#Maven_Coordinates) of its jar instead of 
a path to a local copy of that jar. 

For example, instead of submitting my job using: 
{{bin/flink run /local/path/to/word-count-1.0.1.jar }}

I would specify it's Maven coordinates:
{{bin/flink run com.newrelic:word-count:1.0.1 }}

This latter form would contact known Maven repositories to acquire the jar at 
the specified coordinates and submit that to the cluster. 

Considerations:
* No transitive dependencies should be included - the target, specified either 
as a jar file in the local file system or by its maven coordinates, should be a 
complete Flink job.
* Maven repositories need to be specified _somewhere_. It's reasonable to 
expect that these repositories are independent of the cluster configurations.
* Specified repositories must meet the Maven API, but don't need to be Maven - 
artifactory, for example, is a valid repository as long as it meets the Maven 
API.
* _minor point_: _Indeterminate versions_ should be prohibited - that is, 
consider {{com.newrelic:word-count:+}} an invalid coordinate specification.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes

2016-04-05 Thread Ron Crocker (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15226504#comment-15226504
 ] 

Ron Crocker commented on FLINK-3697:


[~rmetzger] This code is fine to use for your test case. 

The key aspect to include is a key field is lexicographically "greater than" a 
field that holds a nested POJO. This is what causes the index of the key field 
(in the flattened representation of the POJO) to be beyond the natural 
(unflattened) fields of the outer POJO.

I would have provided a shorter example but I didn't have the time to do so.

> keyBy() with nested POJO computes invalid field position indexes
> 
>
> Key: FLINK-3697
> URL: https://issues.apache.org/jira/browse/FLINK-3697
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.0
> Environment: MacOS X 10.10
>Reporter: Ron Crocker
>Priority: Critical
>  Labels: pojo
>
> Using named keys in keyBy() for nested POJO types results in failure. The 
> iindexes for named key fields are used inconsistently with nested POJO types. 
> In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
> after (apparently) flattening the structure but is referenced in the 
> unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.
> In the example below, getFlatFields() returns positions 0, 1, and 14. These 
> positions appear correct in the flattened structure of the Data class. 
> However, in {{KeySelector getSelectorForKeys(Keys keys, 
> TypeInformation typeInfo, ExecutionConfig executionConfig)}}, a call to 
> {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results 
> {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the 
> length of the directly named fields of the object vs the length of flattened 
> version of that type.
> Concrete Example:
> Consider this graph:
> {code}
> DataStream dataStream = see.addSource(new 
> FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), 
> kafkaConsumerProperties));
> dataStream
>   .flatMap(new DataMapper())
>   .keyBy("aaa", "abc", "wxyz")
> {code}
> {{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes 
> this NativeDataFormat object and extracts individual Data objects: {code}
> public class Data {
> public int aaa;
> public int abc;
> public long wxyz;
> public int t1;
> public int t2;
> public Policy policy;
> public Stats stats;
> public Data() {}
> {code}
> A {{Policy}} object is an instance of this class:
> {code}
> public class Policy {
> public short a;
> public short b;
> public boolean c;
> public boolean d;
> public Policy() {}
> }
> {code}
> A {{Stats}} object is an instance of this class:
> {code}
> public class Stats {
> public long count;
> public float a;
> public float b;
> public float c;
> public float d;
> public float e;
> public Stats() {}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes

2016-04-04 Thread Ron Crocker (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Crocker updated FLINK-3697:
---
Description: 
Using named keys in keyBy() for nested POJO types results in failure. The 
iindexes for named key fields are used inconsistently with nested POJO types. 
In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
after (apparently) flattening the structure but is referenced in the 
unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.

In the example below, getFlatFields() returns positions 0, 1, and 14. These 
positions appear correct in the flattened structure of the Data class. However, 
in {{KeySelector getSelectorForKeys(Keys keys, TypeInformation 
typeInfo, ExecutionConfig executionConfig)}}, a call to 
{{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results 
{{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the 
length of the directly named fields of the object vs the length of flattened 
version of that type.

Concrete Example:
Consider this graph:
{code}
DataStream dataStream = see.addSource(new 
FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), 
kafkaConsumerProperties));

dataStream
  .flatMap(new DataMapper())
  .keyBy("aaa", "abc", "wxyz")
{code}

{{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes 
this NativeDataFormat object and extracts individual Data objects: {code}
public class Data {
public int aaa;
public int abc;
public long wxyz;
public int t1;
public int t2;
public Policy policy;
public Stats stats;

public Data() {}
{code}

A {{Policy}} object is an instance of this class:
{code}
public class Policy {
public short a;
public short b;
public boolean c;
public boolean d;

public Policy() {}
}
{code}

A {{Stats}} object is an instance of this class:
{code}
public class Stats {
public long count;
public float a;
public float b;
public float c;
public float d;
public float e;

public Stats() {}
}
{code}

  was:
Using named keys in keyBy() for nested POJO types results in failure. The 
iindexes for named key fields are used inconsistently with nested POJO types. 
In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
after (apparently) flattening the structure but is referenced in the 
unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.

In the example below, getFlatFields() returns positions 0, 1, and 14. These 
positions appear correct in the flattened structure of the Data class. However, 
in {{KeySelector getSelectorForKeys(Keys keys, TypeInformation 
typeInfo, ExecutionConfig executionConfig)}}, a call to 
{{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results 
{{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the 
length of the directly named fields of the object vs the length of flattened 
version of that type.

Concrete Example:
Consider this graph:
{code}
DataStream dataStream = see.addSource(new 
FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), 
kafkaConsumerProperties));

dataStream
  .flatMap(new DataMapper())
  .keyBy("aaa", "abc", "wxyz")
{code}

{{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes 
this NativeDataFormat object and extracts individual Data objects: {code}
public class Data {
public int aaa;
public int abc;
public long wxyz;
public int t1;
public int t2;
public Policy policy;
public Stats stats;

public Data() {}
{code}

A {{Policy}} object is an instance of this class:
{code}
public class AggregatableMetricStoragePolicy implements MetricStoragePolicy {
public short a;
public short b;
public boolean c;
public boolean d;

public Policy() {}
}
{code}

A {{Stats}} object is an instance of this class:
{code}
public class Stats {
public long count;
public float a;
public float b;
public float c;
public float d;
public float e;

public Stats() {}
}
{code}


> keyBy() with nested POJO computes invalid field position indexes
> 
>
> Key: FLINK-3697
> URL: https://issues.apache.org/jira/browse/FLINK-3697
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.0
> Environment: MacOS X 10.10
>Reporter: Ron Crocker
>Priority: Minor
>  Labels: pojo
>
> Using named keys in keyBy() for nested POJO types results in failure. The 
> iindexes for named key fields are used inconsistently with nested POJO types. 
> In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
> after (apparently) flattening 

[jira] [Created] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes

2016-04-04 Thread Ron Crocker (JIRA)
Ron Crocker created FLINK-3697:
--

 Summary: keyBy() with nested POJO computes invalid field position 
indexes
 Key: FLINK-3697
 URL: https://issues.apache.org/jira/browse/FLINK-3697
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.0.0
 Environment: MacOS X 10.10
Reporter: Ron Crocker
Priority: Minor


Using named keys in keyBy() for nested POJO types results in failure. The 
iindexes for named key fields are used inconsistently with nested POJO types. 
In particular, {{PojoTypeInfo.getFlatFields()}} returns the field's position 
after (apparently) flattening the structure but is referenced in the 
unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.

In the example below, getFlatFields() returns positions 0, 1, and 14. These 
positions appear correct in the flattened structure of the Data class. However, 
in {{KeySelector getSelectorForKeys(Keys keys, TypeInformation 
typeInfo, ExecutionConfig executionConfig)}}, a call to 
{{compositeType.getTypeAt(logicalKeyPositions[i])}} for the third key results 
{{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the 
length of the directly named fields of the object vs the length of flattened 
version of that type.

Concrete Example:
Consider this graph:
{code}
DataStream dataStream = see.addSource(new 
FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), 
kafkaConsumerProperties));

dataStream
  .flatMap(new DataMapper())
  .keyBy("aaa", "abc", "wxyz")
{code}

{{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes 
this NativeDataFormat object and extracts individual Data objects: {code}
public class Data {
public int aaa;
public int abc;
public long wxyz;
public int t1;
public int t2;
public Policy policy;
public Stats stats;

public Data() {}
{code}

A {{Policy}} object is an instance of this class:
{code}
public class AggregatableMetricStoragePolicy implements MetricStoragePolicy {
public short a;
public short b;
public boolean c;
public boolean d;

public Policy() {}
}
{code}

A {{Stats}} object is an instance of this class:
{code}
public class Stats {
public long count;
public float a;
public float b;
public float c;
public float d;
public float e;

public Stats() {}
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)