Re: CRD compatible with native and standalone mode

2021-04-20 Thread Yang Wang
Exactly.

I think most of the fields could be shared by standalone and native mode.


Best,
Yang

gaurav kulkarni  于2021年4月21日周三 上午10:17写道:

> Thanks a lot for the response, folks! I appreciate it. I plan to use
> native mode in future mostly for the resource management it plans to offer.
> Let me go through the links provided.
>
> @Yang Wang "Since the CR is defined in yaml[2], native and standalone
> could have some dedicated fields. And you could easily parse them in your
> K8s operator.":  Do you mean having fields that are needed for both native
> and standalone mode in the CRD (probably making them optional in the CRD)
> and each operator type (standalone/native) using fields that are relevant
> for it?
>
> Thanks,
> Gaurav
>
>
> On Tuesday, April 20, 2021, 08:23:34 AM PDT, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>
> Hi Gaurav,
>
> I think the name "Native Kubernetes" is a bit misleading – this just means
> that you can use the Flink CLI/ scripts to run Flink applications on
> Kubernetes without using the Kubernetes APIs/ kubectl directly. What
> features are you looking to use in the native mode?
>
> I think it would be difficult to use this directly inside an operator, but
> keeping "feature parity" with it is a good goal for your CRDs. Since CRDs
> are essentially just a new API, the design approach should be user/
> feature-first. By feature parity, I mean taking currently supported "Native
> Kubernetes" functionality as the feature list for your CRDs, for example:
> * Allowing Secrets to be mounted as files and environment variables [1]
> * Allowing templating of the JobManager and TaskManager Pods (containers,
> etc.) [2]
> * Easy use of built-in plugins [3]
> * etc.
>
> Other "Native Kubernetes" "features", like RBAC and logs, will come "out
> of the box" by defining Custom Resources.
>
> Yang's resources are a great place to start, though I'd suggest defining
> your API spec within the CRD explicitly[4], which will be clearer for your
> users and will allow for schema validation by other tools.
>
> Best,
> Austin
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-secrets
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template
> [3]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-plugins
> [4]:
> https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#specifying-a-structural-schema
>
> On Tue, Apr 20, 2021 at 2:13 AM Yang Wang  wrote:
>
> I think the compatibility depends on you. For example, you could have the
> same
> CustomResourceDefinition for standalone and native Flink applications.
> They could
> look like this[1].
>
> Since the CR is defined in yaml[2], native and standalone could have some
> dedicated fields.
> And you could easily parse them in your K8s operator.
>
> [1].
> https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/crd.yaml
> [2].
> https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/cr.yaml
>
>
> Best,
> Yang
>
> gaurav kulkarni  于2021年4月20日周二 上午8:57写道:
>
> Hi,
>
> I plan to create a flink K8s operator which supports standalone mode, and
> and switch to native mode sometime later. I was wondering what are some of
> the approaches to ensure that CRD is compatible with both native and
> standalone mode?
>
> Thanks
>
>


idleTimeMsPerSecond exceeds 1000

2021-04-20 Thread Alexey Trenikhun
Hello,
When Flink job mostly idle, idleTimeMsPerSecond for given task_name and 
subtask_index sometimes exceeds 1000, I saw values up to 1350, but usually not 
higher than 1020. Is it due to accuracy of nanoTime/currentTimeMillis or there 
is bug in calculations ?

Thanks,
Alexey


Re: CRD compatible with native and standalone mode

2021-04-20 Thread gaurav kulkarni
 Thanks a lot for the response, folks! I appreciate it. I plan to use native 
mode in future mostly for the resource management it plans to offer. Let me go 
through the links provided.  
@Yang Wang "Since the CR is defined in yaml[2], native and standalone could 
have some dedicated fields. And you could easily parse them in your K8s 
operator.":  Do you mean having fields that are needed for both native and 
standalone mode in the CRD (probably making them optional in the CRD) and each 
operator type (standalone/native) using fields that are relevant for it? 
Thanks,Gaurav

On Tuesday, April 20, 2021, 08:23:34 AM PDT, Austin Cawley-Edwards 
 wrote:  
 
 Hi Gaurav,
I think the name "Native Kubernetes" is a bit misleading – this just means that 
you can use the Flink CLI/ scripts to run Flink applications on Kubernetes 
without using the Kubernetes APIs/ kubectl directly. What features are you 
looking to use in the native mode?

I think it would be difficult to use this directly inside an operator, but 
keeping "feature parity" with it is a good goal for your CRDs. Since CRDs are 
essentially just a new API, the design approach should be user/ feature-first. 
By feature parity, I mean taking currently supported "Native Kubernetes" 
functionality as the feature list for your CRDs, for example:* Allowing Secrets 
to be mounted as files and environment variables [1]* Allowing templating of 
the JobManager and TaskManager Pods (containers, etc.) [2]* Easy use of 
built-in plugins [3]* etc.

Other "Native Kubernetes" "features", like RBAC and logs, will come "out of the 
box" by defining Custom Resources.
Yang's resources are a great place to start, though I'd suggest defining your 
API spec within the CRD explicitly[4], which will be clearer for your users and 
will allow for schema validation by other tools.
Best,Austin


[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-secrets[2]:
 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template[3]:
 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-plugins[4]:
 
https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#specifying-a-structural-schema
On Tue, Apr 20, 2021 at 2:13 AM Yang Wang  wrote:

I think the compatibility depends on you. For example, you could have the 
sameCustomResourceDefinition for standalone and native Flink applications. They 
could
look like this[1].
Since the CR is defined in yaml[2], native and standalone could have some 
dedicated fields.And you could easily parse them in your K8s operator.
[1]. 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/crd.yaml[2].
 
https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/cr.yaml

Best,Yang
gaurav kulkarni  于2021年4月20日周二 上午8:57写道:

Hi, 
I plan to create a flink K8s operator which supports standalone mode, and and 
switch to native mode sometime later. I was wondering what are some of the 
approaches to ensure that CRD is compatible with both native and standalone 
mode? 
Thanks 

  

Contiguity and state storage in CEP library

2021-04-20 Thread tbud
We are evaluating a use-case where there will be 100s of events stream coming
in per second and we want to run some fixed set of pattern matching rules on
them And I use relaxed contiguity rules as described in the documentation.
for example :
/a pattern sequence "a b+ c" on the stream of "a", "b1", "d1", "b2", "d2",
"b3" "c" will have results as -- {a b1 c}, {a b1 b2 c}, {a b1 b2 b3 c}, {a
b2 c}, {a b2 b3 c}, {a b3 c}
and I specify time window to be 60 mins using within() clause for this
pattern.
/
Does this mean that the events which don't match i.e. "d2" won't be stored
in state at all ? does the CEP store only matching events in the state for
60 minutes ?

This question is important to estimate the state backend size required for
the usecase and to make sure that the application doesn't go out of memory
due to ever increasing state.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Contiguity in SQL vs CEP

2021-04-20 Thread tbud
There's 3 different types of Contiguity defined in the CEP documentation [1] 
looping + non-looping -- Strict, relaxed and non deterministic relaxed.
There's no equivalent in the SQL documentation [2]. Can someone shed some
light on what's achievable in SQL and what isn't ?
Related question : It seems as if SQL default if I define a pattern such as
(A B+ C) is the relaxed mode. if this is true then when keeping state using
sql are the non-matching events dropped from the state ?


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns

  
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#defining-a-pattern



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Event specific window

2021-04-20 Thread s_penakalap...@yahoo.com
Hi All,
I have one requirement where I need to calculate total amount of transactions 
done by each each user in last 1 hour.Say Customer1 has done 2 transactions one 
at 11:00am and other one at 11:20 am.Customer2 has done 1 transaction one at 
10:00 am Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 
am.
when ever customer does a transaction then we receive an event in source topic, 
we consume the data and need to calculate the total amount spent by Customer in 
last 1 hour.
if I have received customer1 new transaction event at 11:30 am then I need to 
calculate the sum of 3 events done by customer1 in last 1 hour (i.e 11:00 , 
11:20 and 11:30 am - all 3 events fall in last 1 hour window)Now say I receive 
Customer2 new transaction event at 11:30 am then for this customer I need to 
consider only one event 11:30 (ignoring the event at  10:00 am  as it does not 
fall in last 1 hr)Customer3 new transaction is done at 12:40 pm then for this 
Customer I need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 
fall under last 1 hr.
Approach I am planning to try:Every event has the transaction time which I am 
using as event time to assign WatermarkStrategyKeyBy - 
customerIdSlidingEventTimeWindows of 1 hr then process all elements using 
ProcessWindowFunction

Kindly suggest the approach I need to follow to achieve the above scenario 
using Flink Java /Sql. I am using Flink 1.12.0.
Regards,Sunitha

Re: Producing to Kafka topics dynamically without redeployment

2021-04-20 Thread Ejaskhan S
Hi Ahmed,

If you have the logic to identify the destination cluster along with the
target topic, you will be able to achieve this with the above solution.

1. Create one kafka producer for each cluster. If 10 clusters are there,
create 10 producers.

2. Add a new attribute called 'clusterId' or something more meaningful for
identifying the cluster inside TransformedEvent.

3. Filter the Datastream based on the clusterId. And add
the corresponding cluster related producer into the filtered stream.


Thanks
Ejaskhan

On Wed, Apr 21, 2021, 1:49 AM Ahmed A.Hamid  wrote:

> Thank you, Ejaskhan.
>
> I think your suggestion would only work if all the topics were on the same
> Kafka cluster. In my use-case, the topics can be on different clusters,
> which is why I was thinking of rolling a custom sink that detects config
> changes and instantiates Kafka producers on demand as needed.
>
>
> On Tuesday, April 20, 2021, 01:11:08 PM PDT, Ejaskhan S <
> iamejask...@gmail.com> wrote:
>
>
> Hi Ahmed,
>
> If you want to dynamically produce events to different topics and you have
> the logic to identify the target topics,  you will be able to achieve this
> in the following way.
>
>
>- Suppose this is your event after the transformation logic(if any) :
>EVENT.
>- This is the target topic for this event, TOPIC_1.  ( I hope,  you
>have the logic available to identify the topic dynamically)
>
>
>
>- Create a new dataStream(custom DS) containing the folllowing
>attributes, topicName and event.
>
>
> *class TransformedEvent  implements java.io.Serializable  {*
> *String topicName;*
> *Event event;*
> *}*
>
>
>
>- Create the serialization schema for the topic as below,
>
>
> *class CustomKafkaSchema implements
> KafkaSerializationSchema,*
> *KafkaContextAware {*
>
> *@Override*
> *public ProducerRecord serialize(TransformedEvent
> element, @Nullable Long timestamp) {*
> *byte[] serialized = new
> customSerliazer().serialize(element.getEvent());*
> *return new ProducerRecord<>(getTargetTopic(element),*
> *null, null, null, serialized);*
> *}*
>
> *@Override*
> *public String getTargetTopic(TransformedEvent element) {*
> *return element.getTopicName();*
> *}*
>
> }
>
>
>
>- Create the producer as below,
>
>
> *FlinkKafkaProducer producer= new FlinkKafkaProducer<>(*
> *"DEFAULT",*
> *new CustomKafkaSchema(), *
> *   producerConfiguration ,*
> *   FlinkKafkaProducer.Semantic.EXACTLY_ONCE/
> AT_LEAST_ONCE);*
>
>
>
> Thanks
> Ejas khan
>
>
>
> On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid 
> wrote:
>
> Hello everyone,
>
> I have a use-case where I need to have a Flink application produce to a
> variable number of Kafka topics (specified through configuration),
> potentially in different clusters, without having to redeploy the app.
> Let's assume I maintain the set of destination clusters/topics in config
> files, and have code in my Flink app to detect and reload any changes in
> these config files at runtime.
>
> I have two questions:
>
>1. Is that a sound/reasonable thing to do? Or is it going to be
>riddled with issues?
>
>2. To implement that, should I write a custom SinkFunction that
>maintains a set of Kafka producers? Or a custom SinkFunction that delegates
>the work to a collection of FlinkKafkaProducer instances? Is there a better
>approach?
>
> Thanks in advance.
>
> Truly,
> Ahmed
>
>


Re: Producing to Kafka topics dynamically without redeployment

2021-04-20 Thread Ahmed A.Hamid
 Thank you, Ejaskhan.
I think your suggestion would only work if all the topics were on the same 
Kafka cluster. In my use-case, the topics can be on different clusters, which 
is why I was thinking of rolling a custom sink that detects config changes and 
instantiates Kafka producers on demand as needed.


On Tuesday, April 20, 2021, 01:11:08 PM PDT, Ejaskhan S 
 wrote:  
 
 Hi Ahmed,
If you want to dynamically produce events to different topics and you have the 
logic to identify the target topics,  you will be able to achieve this in the 
following way.

   
   - Suppose this is your event after the transformation logic(if any) :  EVENT.
   - This is the target topic for this event, TOPIC_1.  ( I hope,  you have the 
logic available to identify the topic dynamically)

   
   - Create a new dataStream(custom DS) containing the folllowing attributes, 
topicName and event.


class TransformedEvent  implements java.io.Serializable  {String 
topicName;Event event;}

   
   - Create the serialization schema for the topic as below,


class CustomKafkaSchema implements KafkaSerializationSchema,  
      KafkaContextAware {
    @Override    public ProducerRecord 
serialize(TransformedEvent element, @Nullable Long timestamp) {        byte[] 
serialized = new customSerliazer().serialize(element.getEvent());        return 
new ProducerRecord<>(getTargetTopic(element),                null, null, null, 
serialized);    }
    @Override    public String getTargetTopic(TransformedEvent element) {       
 return element.getTopicName();    }
}

   
   - Create the producer as below,


FlinkKafkaProducer producer= new FlinkKafkaProducer<>(        
            "DEFAULT",                    new CustomKafkaSchema(),              
      producerConfiguration ,                   
FlinkKafkaProducer.Semantic.EXACTLY_ONCE/ AT_LEAST_ONCE);



ThanksEjas khan


On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid  wrote:

Hello everyone,
I have a use-case where I need to have a Flink application produce to a 
variable number of Kafka topics (specified through configuration), potentially 
in different clusters, without having to redeploy the app. Let's assume I 
maintain the set of destination clusters/topics in config files, and have code 
in my Flink app to detect and reload any changes in these config files at 
runtime.
I have two questions:   
   - Is that a sound/reasonable thing to do? Or is it going to be riddled with 
issues?   
   

   - To implement that, should I write a custom SinkFunction that maintains a 
set of Kafka producers? Or a custom SinkFunction that delegates the work to a 
collection of FlinkKafkaProducer instances? Is there a better approach?
Thanks in advance.
Truly,Ahmed

  

Re: Producing to Kafka topics dynamically without redeployment

2021-04-20 Thread Ejaskhan S
Hi Ahmed,

If you want to dynamically produce events to different topics and you have
the logic to identify the target topics,  you will be able to achieve this
in the following way.


   - Suppose this is your event after the transformation logic(if any) :
   EVENT.
   - This is the target topic for this event, TOPIC_1.  ( I hope,  you have
   the logic available to identify the topic dynamically)



   - Create a new dataStream(custom DS) containing the folllowing
   attributes, topicName and event.


*class TransformedEvent  implements java.io.Serializable  {*
*String topicName;*
*Event event;*
*}*



   - Create the serialization schema for the topic as below,


*class CustomKafkaSchema implements
KafkaSerializationSchema,*
*KafkaContextAware {*

*@Override*
*public ProducerRecord serialize(TransformedEvent
element, @Nullable Long timestamp) {*
*byte[] serialized = new
customSerliazer().serialize(element.getEvent());*
*return new ProducerRecord<>(getTargetTopic(element),*
*null, null, null, serialized);*
*}*

*@Override*
*public String getTargetTopic(TransformedEvent element) {*
*return element.getTopicName();*
*}*

}



   - Create the producer as below,


*FlinkKafkaProducer producer= new FlinkKafkaProducer<>(*
*"DEFAULT",*
*new CustomKafkaSchema(), *
*   producerConfiguration ,*
*   FlinkKafkaProducer.Semantic.EXACTLY_ONCE/
AT_LEAST_ONCE);*



Thanks
Ejas khan



On Wed, Apr 21, 2021 at 1:08 AM Ahmed A.Hamid  wrote:

> Hello everyone,
>
> I have a use-case where I need to have a Flink application produce to a
> variable number of Kafka topics (specified through configuration),
> potentially in different clusters, without having to redeploy the app.
> Let's assume I maintain the set of destination clusters/topics in config
> files, and have code in my Flink app to detect and reload any changes in
> these config files at runtime.
>
> I have two questions:
>
>1. Is that a sound/reasonable thing to do? Or is it going to be
>riddled with issues?
>
>2. To implement that, should I write a custom SinkFunction that
>maintains a set of Kafka producers? Or a custom SinkFunction that delegates
>the work to a collection of FlinkKafkaProducer instances? Is there a better
>approach?
>
> Thanks in advance.
>
> Truly,
> Ahmed
>
>


Producing to Kafka topics dynamically without redeployment

2021-04-20 Thread Ahmed A.Hamid
Hello everyone,
I have a use-case where I need to have a Flink application produce to a 
variable number of Kafka topics (specified through configuration), potentially 
in different clusters, without having to redeploy the app. Let's assume I 
maintain the set of destination clusters/topics in config files, and have code 
in my Flink app to detect and reload any changes in these config files at 
runtime.
I have two questions:   
   - Is that a sound/reasonable thing to do? Or is it going to be riddled with 
issues?   
   

   - To implement that, should I write a custom SinkFunction that maintains a 
set of Kafka producers? Or a custom SinkFunction that delegates the work to a 
collection of FlinkKafkaProducer instances? Is there a better approach?
Thanks in advance.
Truly,Ahmed


Re: Alternatives to JDBCAppendTableSink in Flink 1.11

2021-04-20 Thread Austin Cawley-Edwards
Hi Sambaran,

I'm not sure if this is the best approach, though I don't know your full
use case/ implementation.

What kind of error do you get when trying to map into a PreparedStatement?
I assume you tried something like this?

SingleOutputStreamOperator stream =
env.fromElements(Row.of("YourProcedureA"), Row.of("YourProcedureB"));

stream.addSink(JdbcSink.sink(
   "EXEC ?",
   (preparedStatement, row) -> {
  // extend `preparedStatement` with row info
  preparedStatement.setString(0, (String) row.getField(0));
   },
   new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  .withUrl("jdbc:derby:memory:ebookshop")
  .withDriverName("org.apache.derby.jdbc.EmbeddedDriver")
  .build()));

Best,
Austin

On Tue, Apr 20, 2021 at 12:42 PM Sambaran  wrote:

> Hi Austin,
>
> We are using this for jdbc interfacing to populate postgres tables based
> on the data coming from an event source.
>
> We tried with the approach mentioned in the doc
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html
> but did not find a suitable way to map SingleOutputStreamOperator .
> Can you please let me know if this is the right approach and if yes, how do
> we map the SingleOutputStreamOperator to the preparedstatement in
> JdbcStatementBuilder?
>
> Thanks for your help!
>
> Regards
> Sambaran
>
> On Tue, Apr 20, 2021 at 6:30 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Great, thanks for the clarification! I'm checking with others now. Are
>> you using other parts of the Table/SQL APIs, or just this for JDBC
>> interfacing?
>>
>> Best,
>> Austin
>>
>> On Tue, Apr 20, 2021 at 12:20 PM Sambaran  wrote:
>>
>>> Hi Austin,
>>>
>>> Thanks for replying. This is exactly as you mentioned here. Do we have a
>>> way to execute the procedure with 1.11 or upper version as
>>> JDBCAppendTableSink is no longer available with these?
>>>
>>> Regards
>>> Sambaran
>>>
>>> On Tue, Apr 20, 2021 at 6:11 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey Sambaran,

 I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure
 I understand what you're current solution looks like, it's something like
 the following, where you're triggering a procedure on each element of a
 stream?

   JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
 .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
 .setDBUrl("jdbc:derby:memory:ebookshop")
 .setQuery("EXEC YourProcedure")
 .build();

 SingleOutputStreamOperator stream =
 env.fromElements(Row.of("a"), Row.of("b"));
 sink.emitDataStream(stream);

 Or something else?

 Best,
 Austin




 On Tue, Apr 20, 2021 at 11:10 AM Sambaran 
 wrote:

> Hi,
>
> I am currently using JDBCAppendTableSink to execute database stored
> procedures from flink to populate data to external tables using
>  SingleOutputStreamOperator (version 1.7). Now we are trying to update to
> Flink 1.11/ later and found JDBCAppendTableSink has been removed, 
> currently
> when looking for an alternative I could not find any suitable approach
> which would call database stored procedure. Is there any alternative
> approach to resolve this?
>
> Regards
> Sambaran
>



Re: Flink Statefun Python Batch

2021-04-20 Thread Timothy Bess
Hi Igal,

Yes! that's exactly what I was thinking. The batching will naturally happen
as the model applies backpressure. We're using pandas and it's pretty
costly to create a dataframe and everything to process a single event.
Internally the SDK has access to the batch and is calling my function,
which creates a dataframe for each individual event. This causes a ton of
overhead since we basically get destroyed by the constant factors around
creating and operating on dataframes.

Knowing how the SDK works, it seems like it'd be easy to do something like
your example and maybe have a different decorator for "batch functions"
where the SDK just passes in everything at once.

Also just out of curiosity are there plans to build out more introspection
into statefun's flink state? I was thinking it would be super useful to add
either Queryable state or have some control topic that statefun listens to
that allows me to send events to introspect or modify flink state.

For example like:

// control topic request
{"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
// response
{"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }

Or

{"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value:
"base64bytes"}
{"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}

Also having opentracing integration where Statefun passes b3 headers with
each request so we can trace a message's route through statefun would be
_super_ useful. We'd literally be able to see the entire path of an event
from ingress to egress and time spent in each function. Not sure if there
are any plans around that, but since we're live with a statefun project
now, it's possible we could contribute some if you guys are open to it.

Thanks,

Tim

On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman  wrote:

> Hi Tim!
>
> Indeed the StateFun SDK / StateFun runtime, has an internal concept of
> batching, that kicks in the presence of a slow
> /congested remote function. Keep in mind that under normal circumstances
> batching does not happen (effectively a batch of size 1 will be sent). [1]
> This batch is not currently exposed via the SDKs (both Java and Python) as
> it is an implementation detail (see [2]).
>
> The way I understand your message (please correct me if I'm wrong): is
> that evaluation of the ML model is costly, and it would benefit from some
> sort of batching (like pandas do i assume ?)
> instead of being applied for every event individually.
> If this is the case, perhaps exposing this batch can be a useful feature
> to add.
>
> For example:
>
> @functions.bind_tim(..)
> def ml(context, messages: typing.List[Message]):
>   ...
>
>
>
> Let me know what you think,
> Igal.
>
>
>
> [1]
> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto#L80
> [2]
> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/request_reply_v3.py#L219
>
> On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess  wrote:
>
>> Hi everyone,
>>
>> Is there a good way to access the batch of leads that Statefun sends to
>> the Python SDK rather than processing events one by one? We're trying to
>> run our data scientist's machine learning model through the SDK, but the
>> code is very slow when we do single events and we don't get many of the
>> benefits of Pandas/etc.
>>
>> Thanks,
>>
>> Tim
>>
>


Re: Max-parellelism limitation

2021-04-20 Thread Austin Cawley-Edwards
Hi Olivier,

Someone will correct me if I'm wrong, but I believe the max-parallelism
limitation, where you cannot scale up past the previously defined
max-parallelism, applies to all stateful jobs no matter which type of state
you are using.

If you haven't seen it already, I think the Production Readiness checklist
[1] offers a good explanation for this behavior/ best practices for setting
it.

Best,
Austin

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/#set-an-explicit-max-parallelism

On Tue, Apr 20, 2021 at 9:33 AM Olivier Nouguier 
wrote:

> Hi, thank you all for your reply, by limitation I meant the impossibility
> to resume a job when scaling up because of this max-parallelism.
> To be more precise, in our deployment, the operator (max) parallelism is
> computed from the number of available slots ( ~~ task-manager * core ),
> this approach is probably too naive, at least with keyed state.
>
>
>
> On Tue, Apr 20, 2021 at 10:46 AM Chesnay Schepler 
> wrote:
>
>> @Olivier Could you clarify which limitation you are referring to?
>>
>> On 4/20/2021 5:23 AM, Guowei Ma wrote:
>>
>> Hi, Olivier
>> Yes. The introduction of this concept is to solve the problem of
>> rescaling the keystate.
>> Best,
>> Guowei
>>
>>
>> On Mon, Apr 19, 2021 at 8:56 PM Olivier Nouguier <
>> olivier.nougu...@teads.com> wrote:
>>
>>> Hi,
>>>   May I have the confirmation that the max-parallelism limitation only
>>> occurs when keyed states are used ?
>>>
>>>
>>> --
>>>
>>> Olivier Nouguier
>>>
>>> SSE
>>>
>>> e | olivier.nougu...@teads.com m | 0651383971
>>>
>>> Teads France SAS, 159 rue de Thor, Business Plaza, Bat. 4, 34000
>>> Montpellier, France
>>> [image: image] 
>>> The information in this email is confidential and intended only for the
>>> addressee(s) named above. If you are not the intended recipient any
>>> disclosure, copying, distribution or any action taken or omitted to be
>>> taken in reliance on it is prohibited and may be unlawful. Teads does
>>> not warrant that any attachment(s) are free from viruses or other defects
>>> and accept no liability for any losses resulting from infected email
>>> transmission. Please note that any views expressed in this email may be
>>> those of the originator and do not necessarily reflect those of the
>>> organization.
>>>
>>
>>
>
> --
>
> Olivier Nouguier
>
> SSE
>
> e | olivier.nougu...@teads.com m | 0651383971
>
> Teads France SAS, 159 rue de Thor, Business Plaza, Bat. 4, 34000
> Montpellier, France
> [image: image] 
>
> The information in this email is confidential and intended only for the
> addressee(s) named above. If you are not the intended recipient any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it is prohibited and may be unlawful. Teads does not
> warrant that any attachment(s) are free from viruses or other defects and
> accept no liability for any losses resulting from infected email
> transmission. Please note that any views expressed in this email may be
> those of the originator and do not necessarily reflect those of the
> organization.
>


Re: Are configs stored as part of savepoints

2021-04-20 Thread Austin Cawley-Edwards
Hi Guarav,

Which configs are you referring to? Everything usually stored in
`flink-conf.yaml`[1]? The State Processor API[2] is also a good resource to
understand what is actually stored, and how you can access it outside of a
running job. The SavepointMetadata class[3] is another place to reference,
which seems to say that the only stored bit that could be influenced by the
config is the `maxParallelism`.

When restoring from a savepoint, you usually want to specify the
configuration as well.

Hope that helps,
Austin


[1]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#configuration
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/docs/libs/state_processor_api/
[3]:
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadata.java

On Mon, Apr 19, 2021 at 8:54 PM gaurav kulkarni 
wrote:

> Hi,
>
> I was wondering if configs applied while creating a flink application are
> also stored as part of savepoint? If yes, an app is restored from a
> savepoint, does it start with the same configs?
>
> Thanks
>
>


Re: Alternatives to JDBCAppendTableSink in Flink 1.11

2021-04-20 Thread Austin Cawley-Edwards
Hey Sambaran,

I'm not too familiar with the 1.7 JDBCAppendTableSink, but to make sure I
understand what you're current solution looks like, it's something like the
following, where you're triggering a procedure on each element of a stream?

  JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("EXEC YourProcedure")
.build();

SingleOutputStreamOperator stream =
env.fromElements(Row.of("a"), Row.of("b"));
sink.emitDataStream(stream);

Or something else?

Best,
Austin




On Tue, Apr 20, 2021 at 11:10 AM Sambaran  wrote:

> Hi,
>
> I am currently using JDBCAppendTableSink to execute database stored
> procedures from flink to populate data to external tables using
>  SingleOutputStreamOperator (version 1.7). Now we are trying to update to
> Flink 1.11/ later and found JDBCAppendTableSink has been removed, currently
> when looking for an alternative I could not find any suitable approach
> which would call database stored procedure. Is there any alternative
> approach to resolve this?
>
> Regards
> Sambaran
>


Re: Flink support for Kafka versions

2021-04-20 Thread Austin Cawley-Edwards
Hi Prasanna,

It looks like the Kafka 2.5.0 connector upgrade is tied to dropping support
for Scala 2.11. The best place to track that would be the ticket for Scala
2.13 support, FLINK-13414 [1], and its subtask FLINK-20845 [2].

I have listed FLINK-20845 as a blocker for FLINK-19168 for better
visibility.

Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-13414
[2]: https://issues.apache.org/jira/browse/FLINK-20845

On Tue, Apr 20, 2021 at 9:08 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi Flinksters,
>
> We are researching about if we could use the latest version of kafka
> (2.6.1 or 2.7.0)
>
> Since we are using Flink as a processor , we came across this
> https://issues.apache.org/jira/browse/FLINK-19168.
>
> It says that it does not support version 2.5.0 and beyond.
>
> That was created 8 months back , just checking if there is any effort on
> that front.
>
> Thanks,
> Prasanna
>


Re: CRD compatible with native and standalone mode

2021-04-20 Thread Austin Cawley-Edwards
Hi Gaurav,

I think the name "Native Kubernetes" is a bit misleading – this just means
that you can use the Flink CLI/ scripts to run Flink applications on
Kubernetes without using the Kubernetes APIs/ kubectl directly. What
features are you looking to use in the native mode?

I think it would be difficult to use this directly inside an operator, but
keeping "feature parity" with it is a good goal for your CRDs. Since CRDs
are essentially just a new API, the design approach should be user/
feature-first. By feature parity, I mean taking currently supported "Native
Kubernetes" functionality as the feature list for your CRDs, for example:
* Allowing Secrets to be mounted as files and environment variables [1]
* Allowing templating of the JobManager and TaskManager Pods (containers,
etc.) [2]
* Easy use of built-in plugins [3]
* etc.

Other "Native Kubernetes" "features", like RBAC and logs, will come "out of
the box" by defining Custom Resources.

Yang's resources are a great place to start, though I'd suggest defining
your API spec within the CRD explicitly[4], which will be clearer for your
users and will allow for schema validation by other tools.

Best,
Austin


[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-secrets
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template
[3]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-plugins
[4]:
https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#specifying-a-structural-schema

On Tue, Apr 20, 2021 at 2:13 AM Yang Wang  wrote:

> I think the compatibility depends on you. For example, you could have the
> same
> CustomResourceDefinition for standalone and native Flink applications.
> They could
> look like this[1].
>
> Since the CR is defined in yaml[2], native and standalone could have some
> dedicated fields.
> And you could easily parse them in your K8s operator.
>
> [1].
> https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/crd.yaml
> [2].
> https://github.com/wangyang0918/flink-native-k8s-operator/blob/master/deploy/cr.yaml
>
>
> Best,
> Yang
>
> gaurav kulkarni  于2021年4月20日周二 上午8:57写道:
>
>> Hi,
>>
>> I plan to create a flink K8s operator which supports standalone mode, and
>> and switch to native mode sometime later. I was wondering what are some of
>> the approaches to ensure that CRD is compatible with both native and
>> standalone mode?
>>
>> Thanks
>>
>


Alternatives to JDBCAppendTableSink in Flink 1.11

2021-04-20 Thread Sambaran
Hi,

I am currently using JDBCAppendTableSink to execute database stored
procedures from flink to populate data to external tables using
 SingleOutputStreamOperator (version 1.7). Now we are trying to update to
Flink 1.11/ later and found JDBCAppendTableSink has been removed, currently
when looking for an alternative I could not find any suitable approach
which would call database stored procedure. Is there any alternative
approach to resolve this?

Regards
Sambaran


Re: How to config the flink to load libs in myself path

2021-04-20 Thread cxydevelop
For example, now I had my custom table source or sink which were builed in a
independent jar , and my main code will depend on it. But I don't want to
package custom connector jar with main code in a jar flie. In other words, I
want to get a thin jar not a fat jar.  So I think I can put the custom
connector jar in *flink/lib*. before I run my job. In fact, it really
work.My jobmanager yaml like as below:-containers:... 
volumeMounts:- mountPath: /opt/flink/conf  name:
flink-config-volume- mountPath: /opt/flink/lib 
name: volume-1618910657181- mountPath:
/opt/flink/flink-uploadjar  name: volume-1618911748381   
- mountPath: /opt/flink/plugins/oss-fs-hadoop/flink-oss-fs-hadoop-1.12.2.jar
 
name: volume-1618916463815  volumes:- configMap:   
defaultMode: 420items:  - key: flink-conf.yaml  
 
path: flink-conf.yaml  - key: log4j-console.properties  
 
path: log4j-console.propertiesname: flink-config  name:
flink-config-volume- hostPath:path:
/data/volumes/flink/volume-for-session/cxylib-common-jartype: ''
 
name: volume-1618910657181- hostPath:path:
/home/uploadjartype: ''  name: volume-1618911748381   
- hostPath:path:
/data/volumes/flink/volume-for-session/plugins/oss-fs-hadoop/flink-oss-fs-hadoop-1.12.2.jar
   
type: ''  name: volume-1618916463815-As the yaml, I have to
mount Host Machine path to container path.Now I deploy flink in k8s cluster
which has three nodes, so I have to put my all jar in three nodes.And then
If I change some codes, I also have to package and put then in three
nodes.So if flink support to config the flink to load lib in myself path, I
can use aliyun oss pv and pvc to mount oss path directly. Like my other yaml
as below:-containers: ... volumeMounts: - mountPath: 
/data   
name: volume-trino-volume  ...  volumes:- name: volume-trino-volume 
 
persistentVolumeClaim:  claimName: trino-volume ... -So if 
flink
support to config like "flink.lib.path : /data/myself/lib", it will very
convenient.I don't know if you know what I mean.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Max-parellelism limitation

2021-04-20 Thread Olivier Nouguier
Hi, thank you all for your reply, by limitation I meant the impossibility
to resume a job when scaling up because of this max-parallelism.
To be more precise, in our deployment, the operator (max) parallelism is
computed from the number of available slots ( ~~ task-manager * core ),
this approach is probably too naive, at least with keyed state.



On Tue, Apr 20, 2021 at 10:46 AM Chesnay Schepler 
wrote:

> @Olivier Could you clarify which limitation you are referring to?
>
> On 4/20/2021 5:23 AM, Guowei Ma wrote:
>
> Hi, Olivier
> Yes. The introduction of this concept is to solve the problem of rescaling
> the keystate.
> Best,
> Guowei
>
>
> On Mon, Apr 19, 2021 at 8:56 PM Olivier Nouguier <
> olivier.nougu...@teads.com> wrote:
>
>> Hi,
>>   May I have the confirmation that the max-parallelism limitation only
>> occurs when keyed states are used ?
>>
>>
>> --
>>
>> Olivier Nouguier
>>
>> SSE
>>
>> e | olivier.nougu...@teads.com m | 0651383971
>>
>> Teads France SAS, 159 rue de Thor, Business Plaza, Bat. 4, 34000
>> Montpellier, France
>> [image: image] 
>> The information in this email is confidential and intended only for the
>> addressee(s) named above. If you are not the intended recipient any
>> disclosure, copying, distribution or any action taken or omitted to be
>> taken in reliance on it is prohibited and may be unlawful. Teads does
>> not warrant that any attachment(s) are free from viruses or other defects
>> and accept no liability for any losses resulting from infected email
>> transmission. Please note that any views expressed in this email may be
>> those of the originator and do not necessarily reflect those of the
>> organization.
>>
>
>

-- 

Olivier Nouguier

SSE

e | olivier.nougu...@teads.com m | 0651383971

Teads France SAS, 159 rue de Thor, Business Plaza, Bat. 4, 34000
Montpellier, France
[image: image] 

The information in this email is confidential and intended only for the
addressee(s) named above. If you are not the intended recipient any
disclosure, copying, distribution or any action taken or omitted to be
taken in reliance on it is prohibited and may be unlawful. Teads does not
warrant that any attachment(s) are free from viruses or other defects and
accept no liability for any losses resulting from infected email
transmission. Please note that any views expressed in this email may be
those of the originator and do not necessarily reflect those of the
organization.


Re: Flink Statefun Python Batch

2021-04-20 Thread Igal Shilman
Hi Tim!

Indeed the StateFun SDK / StateFun runtime, has an internal concept of
batching, that kicks in the presence of a slow
/congested remote function. Keep in mind that under normal circumstances
batching does not happen (effectively a batch of size 1 will be sent). [1]
This batch is not currently exposed via the SDKs (both Java and Python) as
it is an implementation detail (see [2]).

The way I understand your message (please correct me if I'm wrong): is that
evaluation of the ML model is costly, and it would benefit from some sort
of batching (like pandas do i assume ?)
instead of being applied for every event individually.
If this is the case, perhaps exposing this batch can be a useful feature to
add.

For example:

@functions.bind_tim(..)
def ml(context, messages: typing.List[Message]):
  ...



Let me know what you think,
Igal.



[1]
https://github.com/apache/flink-statefun/blob/master/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto#L80
[2]
https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/request_reply_v3.py#L219

On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess  wrote:

> Hi everyone,
>
> Is there a good way to access the batch of leads that Statefun sends to
> the Python SDK rather than processing events one by one? We're trying to
> run our data scientist's machine learning model through the SDK, but the
> code is very slow when we do single events and we don't get many of the
> benefits of Pandas/etc.
>
> Thanks,
>
> Tim
>


Flink support for Kafka versions

2021-04-20 Thread Prasanna kumar
Hi Flinksters,

We are researching about if we could use the latest version of kafka (2.6.1
or 2.7.0)

Since we are using Flink as a processor , we came across this
https://issues.apache.org/jira/browse/FLINK-19168.

It says that it does not support version 2.5.0 and beyond.

That was created 8 months back , just checking if there is any effort on
that front.

Thanks,
Prasanna


Re: Accessing columns from input stream table during Window operations

2021-04-20 Thread Sumeet Malhotra
Thanks Dian, Guowei. I think it makes sense to roll with this approach.

On Tue, Apr 20, 2021 at 8:29 AM Guowei Ma  wrote:

> Hi, Sumeet
> Thanks you for the sharing. As Dian suggested, I think you could use b as
> your `group_by`'s key and so the b could be output directly.
> I think it is more simple.
> Best,
> Guowei
>
>
> On Mon, Apr 19, 2021 at 7:31 PM Dian Fu  wrote:
>
>> Hi Sumeet,
>>
>> Thanks for the sharing.
>>
>> Then I guess you could use `.group_by(col('w'), input.a, input.b)`.
>> Since the value for input.a is always the same, it’s equal to group_by(
>> col(‘w'), input.b) logically. The benefit is that you could access
>> input.a directly in the select clause.
>>
>> Regards,
>> Dian
>>
>> 2021年4月19日 下午6:29,Sumeet Malhotra  写道:
>>
>> Hi Guowei,
>>
>> Let me elaborate the use case with an example.
>>
>> Sample input table looks like this:
>>
>> timea   b   c
>> -
>> t0  a0  b0  1
>> t1  a0  b1  2
>> t2  a0  b2  3
>> t3  a0  b0  6
>> t4  a0  b1  7
>> t5  a0  b2  8
>>
>> Basically, every time interval there are new readings from a fixed set of
>> sensors (b0, b1 and b2). All these rows have a few constant fields
>> representing metadata about the input (a0).
>>
>> Desired output for every time interval is the average reading for every
>> sensor (b0, b1, b2), along with the constant metadata (a0):
>>
>> a0b0avg(c)
>> a0b1avg(c)
>> a0b2avg(c)
>>
>> This is what I was trying to build using a simple Tumble window:
>>
>> input.window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w"))
>> \
>> .group_by(col('w'), input.b) \
>> .select(
>> input.a,<=== constant metadata field,
>> same for every input record
>> input.b,<=== group_by field, to
>> compute averages
>> input.c.avg.alias('avg_value')) \
>> .execute_insert('MySink') \
>> .wait()
>>
>> The example above is highly simplified, but I hope it explains what I'm
>> trying to achieve.
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Mon, Apr 19, 2021 at 3:21 PM Dian Fu  wrote:
>>
>>> Hi Sumeet,
>>>
>>> 1) Regarding to the above exception, it’s a known issue and has been
>>> fixed in FLINK-21922  
>>> [1]. It
>>> will be available in the coming 1.12.3. You could also cherry-pick that fix
>>> to 1.12.2 and build from source following the instruction described in [2]
>>> if needed.
>>>
>>> 2) Regarding to your requirements, could you describe what you want to
>>> do with group window or over window?
>>> For group window(e.g. tumble window, hop window, session window, etc),
>>> it will output one row for multiple inputs belonging to the same window.
>>> You could not just passing through it from input to sink as it is
>>> non-determinitic which row to use as there are multiple input rows. That’s
>>> the reason why you have to declare a field in the group by clause if you
>>> want to access it directly in the select clause. For over window, it will
>>> output one row for each input and so you could pass through it directly.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-21922.
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink
>>>
>>>
>>> 2021年4月19日 下午5:16,Sumeet Malhotra  写道:
>>>
>>> Thanks Guowei. I'm trying out Over Windows, as follows:
>>>
>>> input \
>>> .over_window(
>>> Over.partition_by(col(input.a)) \
>>> .order_by(input.Timestamp) \
>>> .preceding(lit(10).seconds) \
>>> .alias('w')) \
>>> .select(
>>> input.b,
>>> input.c.avg.over(col('w'))) \
>>> .execute_insert('MySink') \
>>> .wait()
>>>
>>> But running into following exception:
>>>
>>> py4j.protocol.Py4JError: An error occurred while calling
>>> z:org.apache.flink.table.api.Over.partitionBy. Trace:
>>> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
>>> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist
>>>
>>> Is there any extra Jar that needs to be included for Over Windows. From
>>> the code it doesn't appear so.
>>>
>>> Thanks,
>>> Sumeet
>>>
>>>
>>> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma  wrote:
>>>
 Hi, Sumeet

 Maybe you could try the Over Windows[1], which could keep the
 "non-group-key" column.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows

 Best,
 Guowei


 On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <
 sumeet.malho...@gmail.com> wrote:

> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't
> cause any issues. It's only when I want to use "input.b".
>
> My use case is to basically emit "input.b" in the final sink as is,
> and not really perform any aggregation on that column - more like pass
> through from input to sink. What's the best way to ac

Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-20 Thread Klemens Muthmann
Hi,

I guess this is more of a Java Problem than a Flink Problem. If you want it 
quick and dirty you could implement a class such as:

public class Value {
private boolean isLongSet = false;
private long longValue = 0L;
private boolean isIntegerSet = false;
private int intValue = 0;

   public Value(final long value) {
   setLong(value);
   }

public void setLong(final long value) |
longValue = value;
isLongSet = true;
   }

   public long getLong() {
   if(isLongSet) {
   return longValue
   }
   }

   // Add same methods for int
   // to satisfy POJO requirements you will also need to add a no-argument 
constructor as well as getters and setters for the boolean flags
}

I guess a cleaner solution would be possible using a custom Kryo serializer as 
explained here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

Regards
  Klemens



> Am 20.04.2021 um 10:34 schrieb Miguel Araújo :
> 
> Hi everyone,
> 
> I have a ProcessFunction which needs to store different number types for 
> different keys, e.g., some keys need to store an integer while others need to 
> store a double.
> 
> I tried to use java.lang.Number as the type for the ValueState, but I got the 
> expected "No fields were detected for class java.lang.Number so it cannot be 
> used as a POJO type and must be processed as GenericType." 
> 
> I have the feeling that this is not the right approach, but the exact type to 
> be stored is only known at runtime which makes things a bit trickier. Is 
> there a way to register these classes correctly, or Is it preferable to use 
> different ValueState's for different types?
> 
> Thanks,
> Miguel



Re: Max-parellelism limitation

2021-04-20 Thread Chesnay Schepler

@Olivier Could you clarify which limitation you are referring to?

On 4/20/2021 5:23 AM, Guowei Ma wrote:

Hi, Olivier
Yes. The introduction of this concept is to solve the problem of 
rescaling the keystate.

Best,
Guowei


On Mon, Apr 19, 2021 at 8:56 PM Olivier Nouguier 
mailto:olivier.nougu...@teads.com>> wrote:


Hi,
  May I have the confirmation that the max-parallelism limitation
only occurs when keyed states are used ?


-- 


Olivier Nouguier

SSE

e | olivier.nougu...@teads.com 
m | 0651383971

Teads France SAS, 159 rue de Thor, Business Plaza, Bat. 4, 34000
Montpellier, France*
*

image 

The information in this email is confidential and intended only
for the addressee(s) named above. If you are not the intended
recipient any disclosure, copying, distribution or any action
taken or omitted to be taken in reliance on it is prohibited and
may be unlawful. Teads does not warrant that any attachment(s) are
free from viruses or other defects and accept no liability for any
losses resulting from infected email transmission. Please note
that any views expressed in this email may be those of the
originator and do not necessarily reflect those of the organization.





Correctly serializing "Number" as state in ProcessFunction

2021-04-20 Thread Miguel Araújo
Hi everyone,

I have a ProcessFunction which needs to store different number types for
different keys, e.g., some keys need to store an integer while others need
to store a double.

I tried to use java.lang.Number as the type for the ValueState, but I got
the expected "No fields were detected for class java.lang.Number so it
cannot be used as a POJO type and must be processed as GenericType."

I have the feeling that this is not the right approach, but the exact type
to be stored is only known at runtime which makes things a bit trickier. Is
there a way to register these classes correctly, or Is it preferable to use
different ValueState's for different types?

Thanks,
Miguel