Re: Question Regarding a sink..

2017-03-23 Thread Tzu-Li (Gordon) Tai
Hi Steve,

This normally shouldn’t happen, unless there simply is two copies of the data.

What is the source of the topology? Also, this might be obvious, but if you 
have broadcasted your input stream to the sink, then each sink instance would 
then get all records in the input stream.

Cheers,
Gordon

On March 24, 2017 at 9:11:35 AM, Steve Jerman (st...@kloudspot.com) wrote:

Hi,

I have a sink writing data to InfluxDB. I’ve noticed that the sink gets 
multiple copies of upstream records..

Why does this happen, and how can I avoid it… ?

Below is a trace …showing 2 records (I have a parallelism of two) for each 
record in the ‘.printToError’ for the same stream.

Any help/suggestions appreciated.

Steve


1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
1> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=8F:13:AC:4A:DA:93, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=43:7D:8A:D4:7D:D7, sessionTime=5000}]
2> Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=77:CD:BD:48:EE:D8, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=2F:A4:BD:56:EC:4D, sessionTime=15000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=17:8E:FC:7E:F7:20, sessionTime=2}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=40:07:2D:CB:41:07, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
associated=false, dwellTime=less_than_fifteen_mins, location=1OPUA7IKN9, 
returned=false, sessionEnd=false}, precision=MILLISECONDS, 
fields={client=28:24:0B:B6:42:CA, sessionTime=5000}]
18:41:27,351 INFO  com.kloudspot.flink.sink.InfluxSink                          
 - Point [name=Presence, time=1490316086000, tags={ap=1OPUA7IKN9, 
ass

Re: How to rewind Kafka cursors into a Flink job ?

2017-03-29 Thread Tzu-Li (Gordon) Tai
Hi Dominique,

What your plan A is suggesting is that a downstream operator can provide 
signals to upstream operators and alter their behaviour.
In general, this isn’t possible, as in a distributed streaming environment it’s 
hard to guarantee what records exactly will be altered by the behaviour.

I would say plan B would be the approach to go for this.
Also, in Flink 1.3.0, the FlinkKafkaConsumer will allow users to define if they 
want to start from the earliest, latest, or some specific offset, completely 
independent of the committed consumer group offsets in Kafka.
This should also come in handy for what you have in mind. Have a look at 
https://issues.apache.org/jira/browse/FLINK-4280 for more details on this :)

Cheers,
Gordon

On March 28, 2017 at 12:35:38 AM, Dominique De Vito (ddv36...@gmail.com) wrote:

Hi,

Is there a way to rewind Kafka cursors (while using here Kafka as a consumer) 
from (inside) a Flink job ?

Use case [plan A]
* The Flink job would listen 1 main "data" topic + 1 secondary "event" topic
* In case of a given event, the Flink job would rewind all Kafka cursors of the 
"data" topic, to go back to the latest cursors and retreat data from there.

Use case-bis [plan A-bis] :
* The Flink job would listen 1 main "data" topic, dealing with data according 
to some params
* This Flink job would listen a WS and in case of a given event, the Flink job 
would rewind all Kafka cursors of the "data" topic, to go back from the latest 
cursors and retreat data from there, according to some new params.

Plan B ;-)
* Listen the events from outside Flink, and in case of an event, stop the Flink 
and relaunch it.

So, if someone has any hint about how to rewind for [plan A] and/or [plan 
A-bis] => thank you !
 
Regards,
Dominique



Re: Apache Flink Hackathon

2017-03-29 Thread Tzu-Li (Gordon) Tai
Sounds like a cool event! Thanks for sharing this!


On March 27, 2017 at 11:40:24 PM, Lior Amar (lior.a...@parallelmachines.com) 
wrote:

Hi all,

My name is Lior and I am working at Parallel Machines (a startup company 
located in the Silicon Valley).

We are hosting a Flink Hackathon on April 10, 3pm - 8pm at Hotel Majestic in 
San Francisco.
(During the first day of Flink Forward, training day)

More details at the meet up event:
https://www.meetup.com/Parallel-Machines-Meetup/events/238390498/


See you there :-)

--lior

Re: Flink 1.2 time window operation

2017-03-29 Thread Tzu-Li (Gordon) Tai
Hi Dominik,

Was the job running with processing time or event time? If event time, how are 
you producing the watermarks?
Normally to understand how windows are firing in Flink, these two factors would 
be the place to look at.
I can try to further explain this once you provide info with these. Also, are 
you using Kafka 0.10?

Cheers,
Gordon

On March 27, 2017 at 11:25:49 PM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

Hi all,  

Lately I’ve been investigating onto the performance characteristics of Flink 
part of our internal benchmark. Part of this we’ve developed and deployed an 
application that pools data from Kafka, groups the data by a key during a fixed 
time window of a minute.  

In total, the topic that the KafkaConsumer pooled from consists of 100 million 
messages each of 100 bytes size. What we were expecting is that no records will 
be neither read nor produced back to Kafka for the first minute of the window 
operation - however, this is unfortunately not the case. Below you may find a 
plot showing the number of records produced per second.  

Could anyone provide an explanation onto the behaviour shown in the graph 
below? What are the reasons behind consuming/producing messages from/to Kafka 
while the window has not expired yet?  



Re: 20 times higher throughput with Window function vs fold function, intended?

2017-03-30 Thread Tzu-Li (Gordon) Tai
I'm wondering what I can tweak further to increase this. I was reading in this 
blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be able 
to squeeze out more out of it.

Not really sure if it is relevant under the context of your case, but you could 
perhaps try tweaking the maximum size of Kafka records fetched on each poll on 
the partitions.
You can do this by setting a higher value for “max.partition.fetch.bytes” in 
the provided config properties when instantiating the consumer; that will 
directly configure the internal Kafka clients.
Generally, all Kafka settings are applicable through the provided config 
properties, so you can perhaps take a look at the Kafka docs to see what else 
there is to tune for the clients.

On March 30, 2017 at 6:11:27 PM, Kamil Dziublinski 
(kamil.dziublin...@gmail.com) wrote:

I'm wondering what I can tweak further to increase this. I was reading in this 
blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be able 
to squeeze out more out of it.

Re: Flink 1.2 time window operation

2017-03-30 Thread Tzu-Li (Gordon) Tai
Hi,

Thanks for the clarification.

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?
First, some remarks here -  sources (in your case the Kafka consumer) will not 
stop fetching / producing data when the windows haven’t fired yet. Does this 
explain what you have plotted in the diagram you attached (sorry, I can’t 
really reason about the diagram because I’m not so sure what the values of the 
x-y axes represent)?

If you’re writing the outputs of the window operation to Kafka (by adding a 
Kafka sink after the windowing), then yes it should only write to Kafka when 
the window has fired. The characteristics will also differ for different types 
of windows, so you should definitely take a look at the Windowing docs [1] 
about them.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
On March 30, 2017 at 2:37:41 PM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?

Re: Flink 1.2 time window operation

2017-03-31 Thread Tzu-Li (Gordon) Tai
Hi Dominik,

I see, thanks for explaining the diagram.

This is expected because the 1 minute window in your case is aligned with the 
beginning of every minute.

For example, if the first element element comes at 12:10:45, then the element 
will be put in the window of 12:10:00 to 12:10:59.
Therefore, it will fire after 14 seconds instead of 1 minute.

Does that explain what you are experiencing?

Cheers,
Gordon


On March 31, 2017 at 3:06:56 AM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

First, some remarks here -  sources (in your case the Kafka consumer) will not 
stop fetching / producing data when the windows haven’t fired yet.

This is for sure true. However, the plot shows the number of records produced 
per second, where each record was assigned a created at timestamp while being 
created and before being pushed back to Kafka. Sorry I did not clarify this 
before. Anyway, because of this I would expect to have a certain lag. Of 
course, messages will not only be produced into Kafka exactly at window expiry 
and then the produced shutdown - however, what concerns me is that messages 
were produced to Kafka before the first window expired - hence the questions. 

If you’re writing the outputs of the window operation to Kafka (by adding a 
Kafka sink after the windowing), then yes it should only write to Kafka when 
the window has fired.

Hence, I this behaviour that you’ve described and we’ve expected did not occur. 

If it would help, I can share the source code and a detail Flink configuration. 

Cheers,
Dominik

On 30 Mar 2017, at 13:09, Tzu-Li (Gordon) Tai  wrote:

Hi,

Thanks for the clarification.

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?
First, some remarks here -  sources (in your case the Kafka consumer) will not 
stop fetching / producing data when the windows haven’t fired yet. Does this 
explain what you have plotted in the diagram you attached (sorry, I can’t 
really reason about the diagram because I’m not so sure what the values of the 
x-y axes represent)?

If you’re writing the outputs of the window operation to Kafka (by adding a 
Kafka sink after the windowing), then yes it should only write to Kafka when 
the window has fired. The characteristics will also differ for different types 
of windows, so you should definitely take a look at the Windowing docs [1] 
about them.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners
On March 30, 2017 at 2:37:41 PM, Dominik Safaric (dominiksafa...@gmail.com) 
wrote:

What are the reasons behind consuming/producing messages from/to Kafka while 
the window has not expired yet?



Re: 回复:Re: flink one transformation end,the next transformation start

2017-03-31 Thread Tzu-Li (Gordon) Tai
Sorry, I just realized our previous conversation on this question was done via 
private email and not to user@flink.apache.org

Forwarding the previous content of the conversation back to the mailing list:

On March 30, 2017 at 4:15:46 PM, rimin...@sina.cn (rimin...@sina.cn) wrote:

the job can run success,but the result is error.
the record 1 and the record 14 is same,so the vector compute cos value is 1,but 
on the yarn the value is not 1,and others are different from the result which 
run on local.

so,i guess,the step:
1 val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]
2 val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
3 val rescomm = computeCosSims 
(dataVec)//DataSet[(String,Array[(String,Double)])]
the record is from 1,2,3;but the step 3 must start when step 2 is end,because 
step 3 compute all record cos sim value must use all data.so is there some 
operate can set the step 3 start when step 2 is end.
- 原始邮件 -----
发件人:"Tzu-Li (Gordon) Tai" 
收件人:rimin...@sina.cn
主题:Re: flink one transformation end,the next transformation start
日期:2017年03月30日 15点54分

Hi,

What exactly is the error you’re running into on YARN? You should be able to 
find them in the TM logs.
It’ll be helpful to understand the problem if you can provide them (just the 
relevant parts of the error will do).
Otherwise, I currently can not tell much from the information here.

Cheers,
Gordon


On March 30, 2017 at 3:33:53 PM, rimin...@sina.cn (rimin...@sina.cn) wrote:

hi,all,
i run a job,it is :
-
val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]
val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
val rescomm = computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])]

but when run on the yarn cluster,the result was error,the job can success;and 
run on the local,in eclipse on my computer,the result is correct.

so,i run twice,
first:
val data = env.readTextFile("hdfs:///")//DataSet[(String,Array[String])]
val dataVec = computeDataVect(data)//DataSet[(String,Int,Array[(Int,Double)])]
dataVec.writeAsText("hdfs///vec")//the vector is correct,

second:
val readVec = 
env.readTextFile("hdfs:///vec").map(...)//DataSet[(String,Int,Array[(Int,Double)])]
val rescomm = computeCosSims (dataVec)//DataSet[(String,Array[(String,Double)])]
and the result is correct,is the same as on local,in eclispe.
--
someone can solve the problem?



Re: elasticsearch version compatibility with 1.3-SNAPSHOT version of flink

2017-04-02 Thread Tzu-Li (Gordon) Tai
Hi Sathi,

Elasticsearch 5.x will be supported in Flink 1.3.0 (will be released at end of 
May).
Right now, if you want to use it you’ll need to build 1.3-SNAPSHOT. I generally 
do not recommend this though, as usually the latest snapshot builds are still 
under heavy development.

Cheers,
Gordon

On April 3, 2017 at 2:44:59 AM, Sathi Chowdhury (sathi.chowdh...@elliemae.com) 
wrote:

In order to use latest kibana,I wanted to know if I can use elasticsearch 5.x 
and inte4gratge it with  flink-connector-elasticsearch_2.10 version 
1.3-SNAPSHOT?
Thanks
Sathi
=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =

Re: Doubt Regarding producing to kafka using flink

2017-04-02 Thread Tzu-Li (Gordon) Tai
Hi Archit!

You’ll need to assign timestamps to the records in your stream before producing 
them to Kafka (i.e. before the FlinkKafkaProducer operator).
Have a look at [1] and [2] on how to do that. Feel free to ask further 
questions if you bump into any!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html

On April 2, 2017 at 6:38:13 PM, Archit Mittal (marchi...@gmail.com) wrote:

Hi 

I am using flink-connector-kafka-0.10_2.10

while producing i am getting error as 

java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:60) 
~[kafka-clients-0.10.0.1.jar:na]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249)
 ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345)
 

how do i put timestamp in my object before producing ?

Thanks
Archit

Re: Doubt Regarding producing to kafka using flink

2017-04-03 Thread Tzu-Li (Gordon) Tai
Hi Archit,

The problem is that you need to assign the returned `DataStream` from 
`stream.assignTimestampsAndWatermarks` to a separate variable, and use that 
when instantiating the Kafka 0.10 sink.
The `assignTimestampsAndWatermarks` method returns a new `DataStream` instance 
with records that have assigned timestamps. Calling it does not affect the 
original `DataStream` instance.

Cheers,
Gordon

On April 3, 2017 at 5:15:03 PM, Archit Mittal (marchi...@gmail.com) wrote:

Hi Gordon
This is the function snippet i am using but i am getting invalid timestamp  
   
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "word");
properties.setProperty("auto.offset.reset", "earliest");


DataStream < WordCount > stream = env.fromElements(wordCount);
stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(WordCount element) {
return DateTime.now().getMillis();
}
});


FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, KAFKA_TOPIC, new 
WordCountSchema(), properties);
config.setWriteTimestampToKafka(true);

    env.execute("job");

On Mon, Apr 3, 2017 at 8:20 AM, Tzu-Li (Gordon) Tai  wrote:
Hi Archit!

You’ll need to assign timestamps to the records in your stream before producing 
them to Kafka (i.e. before the FlinkKafkaProducer operator).
Have a look at [1] and [2] on how to do that. Feel free to ask further 
questions if you bump into any!

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html

On April 2, 2017 at 6:38:13 PM, Archit Mittal (marchi...@gmail.com) wrote:

Hi 

I am using flink-connector-kafka-0.10_2.10

while producing i am getting error as 

java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at 
org.apache.kafka.clients.producer.ProducerRecord.(ProducerRecord.java:60) 
~[kafka-clients-0.10.0.1.jar:na]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249)
 ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345)
 

how do i put timestamp in my object before producing ?

Thanks
Archit



Re: Custom timer implementation using Flink

2017-04-11 Thread Tzu-Li (Gordon) Tai
Hi,

I just need to 
start a timer of x days/hours (lets say) and when it is fired just trigger 
something.
Flink’s lower-level ProcessFunction [1] should be very suitable to implement 
this. Have you taken a look at this and see if it suits your case?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html

Cheers,
Gordon


On April 11, 2017 at 3:25:39 AM, jaxbihani (jagad...@helpshift.com) wrote:

I have a use case which I am trying to solve using Flink. Need an advice to  
decide on the correct approach.  

Use case:  
--  
I have a stream of events partitioned by a key. For some events, I need to  
start a timer (conside this as a SLA i.e. if something is not done in x  
secs/minutes do something). Now when that timer expires I need to perform  
some arbitrary action (like writing to database etc). There will be some  
events which can cancel the timers. (i.e. if event comes before x secs we  
need not run SLA violation action etc.). We are considering flink because  
then we can reuse the scaling, fault tolerance provided by the engine rather  
than building our own. Current rps is ~ 200-300 but it can be expected to  
increase quickly.  

Solutions in mind:  
---  
1. We can think it like CEP use case, where with encoding like "event1  
followed by event2" with "not" in x seconds. i.e. when event 2 is "not"  
arrived in x seconds. I assume there will be NOT operator support. I am not  
sure about memory consumption in CEP. Because x seconds can be x days as  
well and I do not need any batching of events in memory. I just need to  
start a timer of x days/hours (lets say) and when it is fired just trigger  
something. So there is no notion of window as such. Can CEP fit in this type  
of use case? If the timer between events is in days, how about the memory  
consumption?  

2. Use Flink for event processing and delegate the tasks of timers to  
another service i.e. when event occurs send it to kafka with timer  
information and then another service handles timers and send back the event  
again once that is done etc. Looks like many hops in this process and  
latency will be high if SLA is in seconds (I am thinking of using Kafka  
here).  

Is anyone aware of a better way of doing this in flink?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-timer-implementation-using-Flink-tp12581.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Problems with Kerberos Kafka connection in version 1.2.0

2017-04-11 Thread Tzu-Li (Gordon) Tai
Hi Diego,

I think the problem is here:

security.kerberos.login.contexts: Client, KafkaClient

The space between “Client,” and “KafkaClient” is causing the problem.
Removing it should fix your issue.
Cheers,
Gordon

On April 11, 2017 at 3:24:20 AM, Diego Fustes Villadóniga (dfus...@oesia.com)
wrote:

Hi all,



I’m trying to connect to a kerberized Kafka cluster from Flink 1.2.0. I’ve
configured Flink correctly following instructions to get the credentials
from a given keytab. Here is the configuration:





security.kerberos.login.keytab: /home/biguardian/biguardian.keytab

security.kerberos.login.principal: biguardian

security.kerberos.login.use-ticket-cache: false

security.kerberos.login.contexts: Client, KafkaClient





However, I get this error:



org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException:
Could not find a 'KafkaClient' entry in `/tmp/jaas-522055201249067191.conf`



Indeed, the JAAS file is empty.



Do yon know what may be happening?



Diego Fustes Villadóniga, Arquitecto Big Data, CCIM


Re: Data duplication on a High Availability activated cluster after a Task Manager failure recovery

2017-04-16 Thread Tzu-Li (Gordon) Tai
Hi,

A few things to clarify first:

1. What is the sink you are using? Checkpointing in Flink allows for 
exactly-once state updates. Whether or not end-to-end exactly-once delivery can 
be achieved depends on the sink. For data store sinks such as Cassandra / 
Elasticsearch, this can be made effectively exactly-once using idempotent 
writes (depending on the application logic). For a Kafka topic as a sink, 
currently the delivery is only at-least-once. You can check out [1] for an 
overview.

2. Also note that if there essentially is already duplicates in the consumed 
Kafka topic (which may occur since Kafka producing does not support any kind of 
transactions at the moment), then they will all be consumed and processed by 
Flink.

However, this does not explain missing data, as this should not happen.
So for this, yes, I would try to check if there’s an issue with the application 
logic or the events simply were not in the consumed Kafka topic in the first 
place.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/guarantees.html


On 17 April 2017 at 12:14:00 PM, F.Amara (fath...@wso2.com) wrote:

Hi all,  

I'm using Flink 1.2.0. I have a distributed system where Flink High  
Availability feature is activated. Data is produced using a Kafka broker and  
on a TM failure scenario, the cluster restarts. Checkpointing is enabled  
with exactly once processing.  
Problem encountered is, at the end of data processing I receive duplicated  
data and some data are also missing. (ex: if 2000 events are sent it loses  
around 800 events and some events are duplicated at the receiving end).  

Is this an issue with the Flink version or would it be an issue from my  
program logic?  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-duplication-on-a-High-Availability-activated-cluster-after-a-Task-Manager-failure-recovery-tp12627.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Flink errors out and job fails--IOException from CollectSink.open()

2017-04-16 Thread Tzu-Li (Gordon) Tai
Hi,

The NPE should definitely be fixed. Here’s the filed JIRA for it: 
https://issues.apache.org/jira/browse/FLINK-6311.

@Sathi: When building Flink we suggest to avoid 3.3.x+ as you have experienced 
yourself, it is subject to incorrect shading of some of the dependencies.

Cheers,
Gordon


On 16 April 2017 at 6:10:18 PM, Ted Yu (yuzhih...@gmail.com) wrote:

Can you describe the transitive dependency issue in more detail (since a lot of 
people use 3.3.x) ?

Thanks

On Apr 16, 2017, at 1:56 AM, Sathi Chowdhury  
wrote:

Ok finally able to run the job fine ...the culprit was an older version of aws 
inside flink kinesis connector jar , also rebuilt flink again from source with 
maven 3.25 ,I was on 3.3* and it had weird transitive dependency problems.




On Apr 15, 2017, at 11:13 AM, Sathi Chowdhury  
wrote:

I have redone the pipeline with flink-1.3-SNAPSHOT and running on EMR 5.4, the 
aws-sdk-java latest libraries directly in flink lib dir.

I have come to the point where I get the java.net.SocketException: Broken pipe 
(Write failed). Eager to get a reply and a clue on this!

 

java.io.IOException: Error sending data back to client 
(ip-172-31-42-238/172.31.42.238:42753)

    at 
org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64)

    at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)

    at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)

    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)

    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)

    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:679)

    at java.lang.Thread.run(Thread.java:745)

Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: 
Broken pipe (Write failed)

    at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)

    at com.esotericsoftware.kryo.io.Output.require(Output.java:142)

    at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444)

    at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345)

    at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173)

    at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166)

    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

    at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95)

    at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)

    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:202)

    at 
org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61)

    ... 6 more

Caused by: java.net.SocketException: Broken pipe (Write failed)

    at java.net.SocketOutputStream.socketWrite0(Native Method)

    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)

    at java.net.SocketOutputStream.write(SocketOutputStream.java:155)

    at java.io.DataOutputStream.write(DataOutputStream.java:107)

    at 
org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:41)

    at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

    ... 17 more

Thanks

 

 

From: Sathi Chowdhury 
Date: Friday, April 14, 2017 at 1:29 AM
To: "user@flink.apache.org" 
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

I am consistently seeing the same behavior…tried with elevated memory for job 
manager and taskmanager

taskmanager.rpc.port: 6123

taskmanager.data.port: 4964

taskmanager.heap.mb: 39000

taskmanager.numberOfTaskSlots: 1

taskmanager.network.numberOfBuffers: 16368

taskmanager.memory.preallocate: false

parallelization.degree.default: 4

even though the jobmanager is  restarting the flink job ,but the subtasks once 
reach to cancelled state does not revive

I have no clue how to approach this.

Thanks

 

 

From: Sathi Chowdhury 
Date: Thursday, April 13, 2017 at 9:52 PM
To: Ted Yu , "user@flink.apache.org" 

Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

 

In my jobmanager log I see this exception , probably is the root cause why the 
whole job is killed…is there any memory problem in jobmanager ? any clue for 
this error below?

I ran the yarn-session

 

 

And my flink-conf.yaml is pretty much unmodified

jobmanager.heap.mb: 256

 

taskmanager.heap.mb: 512

 

taskmanager.numberOfTaskSlots: 2

 

taskmanager.memory.preallocate: false

 

parallelism.default: 1

 

invoke of yarn-session.sh

HADOOP_CONF_DIR=/etc/hadoop/conf $FLINK_HOME/bin/y

Re: Kafka offset commits

2017-04-16 Thread Tzu-Li (Gordon) Tai
Hi,

The FlinkKafkaConsumer in 1.2 is able to restore from older version state 
snapshots and bridge the migration, so there should be no problem in reading 
the offsets from older state. The smallest or highest offsets will only be used 
if the offset no longer exists due to Kafka data retention settings.

Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 
1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 
1 (wrt to how Kafka itself defines the committed offsets).
However, this should not affect the behavior of restoring from offsets in 
savepoints, so it should be fine.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-4723

On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers 
(gwenhael.pasqui...@ericsson.com) wrote:

Hello,

 

We’re going to migrate some applications that consume data from a Kafka 0.8 
from Flink 1.0 to Flink 1.2.

 

We are wondering if the offset commitment system changed between those two 
versions: is there a risk that the Flink 1.2-based application will start with 
no offset (thus will use either the smallest or highest one) ?

Or can we assume that the Flink 1.2 app will resume its work at the same offset 
than the Flink 1.0 app stopped (if they use the same consumer group id) ?

 

B.R.

Re: Using Contextual Data

2017-04-16 Thread Tzu-Li (Gordon) Tai
Hi,

It seems like you want to load the context tables from external databases when 
the CEP operators start, and allow pattern detection logic to refer to those 
tables. I’m not entirely sure if this possible with the Flink CEP library.
I’ve looped in Klou who is more familiar with the CEP library and would 
probably have more insight on this (its currently the national holidays so we 
might not be responsive at the moment).

Otherwise, this is definitely achievable using the DataStream API. More 
specifically, you would want to implement the rich variant functions and load 
the contextual tables in `open()`. The occurrence frequency of events would be 
kept as registered state in the case you described.

Hope this helps!

Cheers,
Gordon


On 14 April 2017 at 7:41:46 AM, Doron Keller (do...@exabeam.com) wrote:

Hello Flink,

 

Is it possible to use contextual data as part of Event Processing (CEP)?

 

I would like to create some tables associated with each key. These tables would 
need to be updated as information streams in. The values in these tables would 
also have to be referenced as part of the rule logic. E.g. if a certain value 
was seen more than x times, trigger.

 

Finally, when the system is starting up these tables would have to be loaded 
from a database.

 

Thanks,

Doron

Re: Why I am getting Null pointer exception while accessing RuntimeContext in FlinkKafkaProducer010 ?

2017-04-17 Thread Tzu-Li (Gordon) Tai
Hi,

How are you using your extended sink?

The runtime context is provided to the UDF by the system when the job is 
executed.
So, for example, if you’re just testing out your UDF in unit tests, 
`getRuntimeContext` would return null (simply because it isn’t set). In such 
cases, you would need to inject a mock runtime context to perform the test.

Does this explain your case?

Cheers,
Gordon

On 17 April 2017 at 6:45:31 PM, sohimankotia (sohimanko...@gmail.com) wrote:

I have a class extending FlinkKafkaProducer010 . In Invoke method I am using  
this.getRuntimeContext to increment flink counter .  

But I am getting null value of RuntimeContext which is leading to  
NullPointerException .  

Using Flink 1.2  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Why-I-am-getting-Null-pointer-exception-while-accessing-RuntimeContext-in-FlinkKafkaProducer010-tp12633.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Parallel execution but keep order of kafka messages

2017-04-17 Thread Tzu-Li (Gordon) Tai
Hi Benjamin,

In your case, the tumbling window subtasks would each have 3 input streams, 1 
for each of the 3 FlinkKafkaConsumer operator subtasks.

I thought that each subtask of the window would get only elements from one 
partition and therefore the watermarks would be calculated independently per 
stream. 
This is a misunderstanding. After the keyBy, the window subtasks could get 
input from any of the consumer subtasks, and would therefore need to wait for 
broadcasted watermarks from all of them. It just happens to be that in your 
case, each consumer subtasks will only produce records with exactly one key.

Moving back a bit to your original setup: it seems like what you want to 
achieve is a simple window on each partition independently, and then produce 
the window output to a new partition.
In your original setup where each topic and its corresponding output topic each 
has 1 partition, I’d actually just have separate jobs for each topic-to-topic 
pipeline, instead of bundling them into one job. Was there any specific reason 
for bundling them together?

Cheers,
Gordon
On 17 April 2017 at 5:04:26 PM, Benjamin Reißaus (benjamin.reiss...@gmail.com) 
wrote:

Hi, 

So I have been rearranging my architecture to where I only have one input and 
one output topic, each with 3 partitions and in my flink job I have one 
consumer and one producer running with parallelism of 3. To run in parallel, I 
extract the partition from the metadata information per kafka message and keyBy 
that very partition. The code sample is at the bottom. 

Now it seems though, that my tumbling window of 1 second that I run on all 
partitions and that I use to calculate statistics only gives output on one 
partition. The reason seems to be that the timestamps of partition A and B are 
2 hours ahead of partition C. In the documentation I read that the event time 
of an operator following a keyBy (my tumbling window) is the minimum of its 
input streams’ event times. 

But is that even the case for me? Does my tumbling window have multiple input 
streams? I thought that each subtask of the window would get only elements from 
one partition and therefore the watermarks would be calculated independently 
per stream. 

I would appreciate any input! Again, my goal is to run the same queries on 
independent kafka streams. 

Best regards,
Ben

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time.Time
import org.hpi.esb.flink.datamodel.{SimpleRecord, Statistics}

class StatisticsQuery(windowSize: Int)
  extends Query[(String, SimpleRecord), (String, Statistics)] {

  override def execute(stream: DataStream[(String, SimpleRecord)]): 
DataStream[(String, Statistics)] = {
stream
  .keyBy(_._1)
  .timeWindow(Time.milliseconds(windowSize))
  .fold(("", new Statistics())) { (acc, value) => Statistics.fold(acc, 
value) }
  }
}

2017-04-14 19:22 GMT+02:00 Benjamin Reißaus :
Hi everybody,

 

I have the following flink/kafka setup:

 

I have 3 kafka “input” topics and 3 “output” topics with each 1 partition (only 
1 partition because the order of the messages is important). I also have 1 
master and 2 flink slave nodes with a total of 16 task slots.

In my flink program I have created 3 consumers - each for one of the input 
topics.

On each of the datastreams I run a query that generates statistics over a 
window of 1 second and I write the result to the corresponding output topic. 
You can find the execution plan with parallelism set to 2 attached.

 

This setup with parallelism=2 sometimes seems to give me the wrong order of the 
statistics results. I assume it is because of the rebalancing before the last 
map which leads to a race condition when writing to kafka.

 

If I set parallelism to 1 no rebalancing will be done but only one task slot is 
used.

 

This has led me to the following questions:

 

Why is only 1 task slot used with my 3 pipelines when parallelism is set to 1? 
As far as I understand, the parallelism refers to the number of parallel 
instances a task can be split into. Therefore, I would assume that I could 
still run multiple different tasks (e.g. different maps or window functions on 
different streams) in different task slots, right?

 

And to come back to my requirement: Is it not possible to run all 3 pipelines 
in parallel and still keep the order of the messages and results?

 

I also asked these questions on stackoverflow. And it seems that I have similar 
trouble understanding the terms “task slot”, “subtasks” etc. like Flavio 
mentioning in this flink mail thread.

 

Thank you and I would appreciate any input!

 

Best regards,

Ben




Re: Kafka offset commits

2017-04-19 Thread Tzu-Li (Gordon) Tai
Thanks for the clarification Aljoscha!
Yes, you cannot restore from a 1.0 savepoint in Flink 1.2 (sorry, I missed the 
“1.0” part on my first reply).

@Gwenhael, I’ll try to reclarify some of the questions you asked:

Does that means that flink does not rely on the offset in written to zookeeper 
anymore, but relies on the snapshots data, implying that it’s crucial to keep 
the same snapshot folder before and after the migration to Flink 1.2 ?

For the case of 1.0 —> 1.2, you’ll have to rely on committed offsets in Kafka / 
ZK for the migration. State migration from 1.0 to 1.2 is not possible.

As Aljoscha pointed out, if you are using the same “group.id”, then there 
shouldn’t be a problem w.r.t. retaining the offset position. You just have to 
keep in mind of [1], as you would need to manually increase all committed 
offsets in Kafka / ZK by 1 for that consumer group.

Note that there is no state migration happening here, but just simply relying 
on offsets committed in Kafka / ZK to define the starting position when you’re 
starting the job in 1.2.

We were also wondering if the flink consumer was able to restore it’s offset 
from Zookeeper.

For FlinkKafkaConsumer08, the starting offset is actually always read from ZK.
Again, this isn’t a “restore”, but just defining start position using committed 
offsets.

Another question : is there an expiration to the snapshots ? We’ve been having 
issues with an app that we forgot to restart. We did it after a couple of days, 
but it looks like it did not restore correctly the offset and it started 
consuming from the oldest offset, creating duplicated data (the kafka queue has 
over a week of buffer).

There is no expiration to the offsets stored in the snapshots. The only issue 
would be if Kafka has expired that offset due to data retention settings.
If you’re sure that at the time of the restore the data hasn’t expired yet, 
there might be something weird going on.
AFAIK, the only issue that was previously known to possibly cause this was [2].
Could you check if that issue may be the case?

[1] https://issues.apache.org/jira/browse/FLINK-4723
[2] https://issues.apache.org/jira/browse/FLINK-6006
On 19 April 2017 at 5:14:35 PM, Aljoscha Krettek (aljos...@apache.org) wrote:

Hi,
AFAIK, restoring a Flink 1.0 savepoint should not be possible on Flink 1.2. 
Only restoring from Flink 1.1 savepoints is supported.

@Gordon If the consumer group stays the same the new Flink job should pick up 
where the old one stopped, right?

Best,
Aljoscha

On 18. Apr 2017, at 16:19, Gwenhael Pasquiers  
wrote:

Thanks for your answer.
Does that means that flink does not rely on the offset in written to zookeeper 
anymore, but relies on the snapshots data, implying that it’s crucial to keep 
the same snapshot folder before and after the migration to Flink 1.2 ?
We were also wondering if the flink consumer was able to restore it’s offset 
from Zookeeper.
Another question : is there an expiration to the snapshots ? We’ve been having 
issues with an app that we forgot to restart. We did it after a couple of days, 
but it looks like it did not restore correctly the offset and it started 
consuming from the oldest offset, creating duplicated data (the kafka queue has 
over a week of buffer).
B.R.
 
From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] 
Sent: lundi 17 avril 2017 07:40
To: user@flink.apache.org
Subject: Re: Kafka offset commits
 
Hi,
 
The FlinkKafkaConsumer in 1.2 is able to restore from older version state 
snapshots and bridge the migration, so there should be no problem in reading 
the offsets from older state. The smallest or highest offsets will only be used 
if the offset no longer exists due to Kafka data retention settings.
 
Besides this, there was once fix related to the Kafka 0.8 offsets for Flink 
1.2.0 [1]. Shortly put, before the fix, the committed offsets to ZK was off by 
1 (wrt to how Kafka itself defines the committed offsets).
However, this should not affect the behavior of restoring from offsets in 
savepoints, so it should be fine.
 
Cheers,
Gordon
 
[1] https://issues.apache.org/jira/browse/FLINK-4723
 
On 13 April 2017 at 10:55:40 PM, Gwenhael Pasquiers 
(gwenhael.pasqui...@ericsson.com) wrote:

Hello,
 
We’re going to migrate some applications that consume data from a Kafka 0.8 
from Flink 1.0 to Flink 1.2.
 
We are wondering if the offset commitment system changed between those two 
versions: is there a risk that the Flink 1.2-based application will start with 
no offset (thus will use either the smallest or highest one) ?
Or can we assume that the Flink 1.2 app will resume its work at the same offset 
than the Flink 1.0 app stopped (if they use the same consumer group id) ?
 
B.R.



Re: Read from and Write to Kafka through flink

2017-04-19 Thread Tzu-Li (Gordon) Tai
Hi Pradeep,

There is not single API or connector to take input as a file and writing it to 
Kafka.
In Flink, this operation consists of 2 parts, 1) source reading from input, and 
2) sink producing to Kafka.
So, all you have to have a job that consists of that source and sink.

You’ve already figured out 2). For 1), you can take a look at the built-in file 
reading source: `StreamExecutionEnvironment.readFile`.

The program quickly executes comes out. 

I might need some more information here:
Do you mean that the job finished executing very fast?
If so, there should be an error of some kind. Could you find and paste it here?

If the job is actually running, and you’re constantly writing to the Kafka 
topic, but the job just isn’t consuming them, there are a few things you could 
probably check:
1) are you sure the Kafka broker is the same version as the connector you are 
using?
2) make sure that you are using different consumer groups, if the offsets are 
committed back to Kafka. Check out [1] to see in which conditions offsets are 
committed.

By the way, I’m continuing this thread only on the user@ mailing list, as 
that’s the more suitable place for this.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
On 20 April 2017 at 7:38:36 AM, Pradeep Anumala (pradeep.anuma...@gmail.com) 
wrote:

Hi,  
I am a beginner with Apache Flink. I am trying to write to Kafka through  
a file and read the data from kafka. I see there is an API to read from and  
write to kafka.  

The following writes to kafka  
FlinkKafkaProducer08 myProducer = new FlinkKafkaProducer08(  
"localhost:9092", // broker list  
"my-topic", // target topic  
new SimpleStringSchema()); // serialization schema  

Is there any API which takes input as file and writes the file content to  
kafka ?  


My second question  
-  
I have run the kafka producer on the terminal  
I am trying to read from kafka using the below code. But this doesn't print  
any output though I am giving some input in the producer terminal.  
The program quickly executes comes out. Please let me know how I can read  
from kafka ?  

DataStream data = env.addSource(new  
FlinkKafkaConsumer010("myTopic",new SimpleStringSchema(),  
props)).print();  


Re: Flink Kafka Consumer Behaviour

2017-04-20 Thread Tzu-Li (Gordon) Tai
Hi Sandeep,

It isn’t fixed yet, so I think external tools like the Kafka offset checker 
still won’t work.
If you’re using 08 and is currently stuck with this issue, you can still 
directly query ZK to get the offsets.

I think for FlinkKafkaConsumer09 the offset is exposed to Flink's metric system 
using Kafka’s own returned metrics, but for 08 this is still missing.

There is this JIRA [1] that aims at exposing consumer lag across all Kafka 
consumer versions to Flink metrics. Perhaps it would make sense to also 
generally expose the offset for all Kafka consumer versions to Flink metrics as 
well.

- Gordon

[1] https://issues.apache.org/jira/browse/FLINK-6109


On 19 April 2017 at 5:11:11 AM, sandeep6 (vr1meghash...@gmail.com) wrote:

Is this fixed now? If not, is there any way to monitor kafka offset that is  
being processed by Flink? This should be a use case for everyone who uses  
Flink with Kafka.  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-Consumer-Behaviour-tp8257p12663.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Flink Kafka Consumer Behaviour

2017-04-20 Thread Tzu-Li (Gordon) Tai
One additional note:

In FlinkKafkaConsumer 0.9+, the current read offset should already exist in 
Flink metrics.
See https://issues.apache.org/jira/browse/FLINK-4186.

But yes, this is still missing for 0.8, so you need to directly query ZK for 
this.

Cheers,
Gordon

On 21 April 2017 at 8:28:09 AM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) wrote:

Hi Sandeep,

It isn’t fixed yet, so I think external tools like the Kafka offset checker 
still won’t work.
If you’re using 08 and is currently stuck with this issue, you can still 
directly query ZK to get the offsets.

I think for FlinkKafkaConsumer09 the offset is exposed to Flink's metric system 
using Kafka’s own returned metrics, but for 08 this is still missing.

There is this JIRA [1] that aims at exposing consumer lag across all Kafka 
consumer versions to Flink metrics. Perhaps it would make sense to also 
generally expose the offset for all Kafka consumer versions to Flink metrics as 
well.

- Gordon

[1] https://issues.apache.org/jira/browse/FLINK-6109


On 19 April 2017 at 5:11:11 AM, sandeep6 (vr1meghash...@gmail.com) wrote:

Is this fixed now? If not, is there any way to monitor kafka offset that is
being processed by Flink? This should be a use case for everyone who uses
Flink with Kafka.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-Consumer-Behaviour-tp8257p12663.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: put record to kinesis and then trying consume using flink connector

2017-04-23 Thread Tzu-Li (Gordon) Tai
Hi Sathi,

Here, in the producer-side log, it says:
2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes :162810 partition key 
:fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: 
shardId-Sequence 
number49572539577762637793132019619873654976833479400677703682 Stream 
Name:mystream

The stream the record was inserted into is “mystream”.

However,

    DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));
you seem to be consuming from “myStream”.

Could the capital “S” be the issue?

Cheers,
Gordon


On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury (sathi.chowdh...@elliemae.com) 
wrote:

Hi Flink Dev,

I thought something will work easily with flink and it is simple enough ,yer I 
am struggling to make it work.

I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

 

Basically I am trying to bootstrap a stream with one event pushed into it as a 
warmup inside flink job’s main method and I use aws kinesis client to simply 
putrecord into a given stream.

My expectation is that now if I addSource to a kinesis stream the data stream 
will consume the event I pushed.

 

 

 

//This is the method that pushes to the kinesis Stream “mystream”

    publishToKinesis(“mystream”,regionName,data) ;

 

 

    Properties consumerConfig = new Properties();
    consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
    
consumerConfig.setProperty(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION,
 ConsumerConfigConstants.InitialPosition.LATEST.toString());
    
consumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"AUTO");

    final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new 
Configuration(), false);
    cluster.start();
    ObjectMapper mapper = new ObjectMapper();
    final StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment(
    "localhost", cluster.getLeaderRPCPort());

    DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


    for (Iterator it = DataStreamUtils.collect(outputStream); 
it.hasNext(); ) {
    String actualOut = it.next();
    ObjectNode actualOutNode = (ObjectNode) 
mapper.readTree(actualOut);
    //then I do want to  either print it or do some further 
validation etc.      
   }
 

 

……..

 

Not sure why the record that I published , FlinkKinesisConsumer is not able to 
react to it, it keeps waiting for it…at the step it.next();

 

 

I print out the SequenceNumber I put the record at

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes :162810 partition key 
:fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: 
shardId-Sequence 
number49572539577762637793132019619873654976833479400677703682 Stream 
Name:mystream

 

 

And Flink job is logging this at the end where it is waiting

2017-04-22 19:47:29,423 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: 
Subtask 0 will be seeded with initial shard 
KinesisStreamShard{streamName='mystream', shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49572531519165352852103352736022695959324427654906511362,}}'}, starting state 
set as sequence number LATEST_SEQUENCE_NUM

2017-04-22 19:47:29,425 INFO  [Source: Custom Source (1/1)] KinesisDataFetcher: 
Subtask 0 will start consuming seeded shard 
KinesisStreamShard{streamName=’mystream’, shard='{ShardId: 
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49572531519165352852103352736022695959324427654906511362,}}'} from sequence 
number LATEST_SEQUENCE_NUM with ShardConsumer 0

 

Any clue will be awesome to clear my confusion.

Thanks

Sathi

=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 

Re: Kinesis connector SHARD_GETRECORDS_MAX default value

2017-04-23 Thread Tzu-Li (Gordon) Tai
Thanks for filing the JIRA!

Would you also be up to open a PR to for the change? That would be very very 
helpful :)

Cheers,
Gordon

On 24 April 2017 at 3:27:48 AM, Steffen Hausmann (stef...@hausmann-family.de) 
wrote:

Hi Gordon,  

thanks for looking into this and sorry it took me so long to file the  
issue: https://issues.apache.org/jira/browse/FLINK-6365.  

Really appreciate your contributions for the Kinesis connector!  

Cheers,  
Steffen  

On 22/03/2017 20:21, Tzu-Li (Gordon) Tai wrote:  
> Hi Steffan,  
>  
> I have to admit that I didn’t put too much thoughts in the default  
> values for the Kinesis consumer.  
>  
> I’d say it would be reasonable to change the default values to follow  
> KCL’s settings. Could you file a JIRA for this?  
>  
> In general, we might want to reconsider all the default values for  
> configs related to the getRecords call, i.e.  
> - SHARD_GETRECORDS_MAX  
> - SHARD_GETRECORDS_INTERVAL_MILLIS  
> - SHARD_GETRECORDS_BACKOFF_*  
>  
> Cheers,  
> Gordon  
>  
> On March 23, 2017 at 2:12:32 AM, Steffen Hausmann  
> (stef...@hausmann-family.de <mailto:stef...@hausmann-family.de>) wrote:  
>  
>> Hi there,  
>>  
>> I recently ran into problems with a Flink job running on an EMR cluster  
>> consuming events from a Kinesis stream receiving roughly 15k  
>> event/second. Although the EMR cluster was substantially scaled and CPU  
>> utilization and system load were well below any alarming threshold, the  
>> processing of events of the stream increasingly fell behind.  
>>  
>> Eventually, it turned out that the SHARD_GETRECORDS_MAX defaults to 100  
>> which is apparently causing too much overhead when consuming events from  
>> the stream. Increasing the value to 5000, a single GetRecords call to  
>> Kinesis can retrieve up to 10k records, made the problem go away.  
>>  
>> I wonder why the default value for SHARD_GETRECORDS_MAX is chosen so low  
>> (100x less than it could be). The Kinesis Client Library defaults to  
>> 5000 and it's recommended to use this default value:  
>> http://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#consumer-app-reading-slower.
>>   
>>  
>>  
>> Thanks for the clarification!  
>>  
>> Cheers,  
>> Steffen  


Re: put record to kinesis and then trying consume using flink connector

2017-04-26 Thread Tzu-Li (Gordon) Tai
Hi Sathi,

Just curious: you mentioned that you’re writing some records in the main method 
of your job application, I assume that this is just for testing purposes, 
correct? If so, you can perhaps just use “EARLIEST” as the starting position. 
Or “AT_TIMESTAMP”, as you are currently doing.

And yes, you’re correct about the observation that the job transformations are 
lazily executed, so the Kinesis consumer connectors to your Kinesis stream 
after anything that happened in the main method.

Also, regarding one of your earlier questions:

I also had a question around how long is the data that you broadcast in a 
stream that is not changing available in operator’s JVM …will it be as long as 
the operator is alive.

What happens when a slot dies. Does the new slot automatically get aware of the 
broadcasted data?

I’m not sure what you mean here. Could you elaborate a bit more?

Cheers,
Gordon


On 26 April 2017 at 7:01:14 AM, Sathi Chowdhury (sathi.chowdh...@elliemae.com) 
wrote:

Hi ,

I also had a question around how long is the data that you broadcast in a 
stream that is not changing available in operator’s JVM …will it be as long as 
the operator is alive.

What happens when a slot dies. Does the new slot automatically get aware of the 
broadcasted data?

Thanks

Sathi

 

From: Sathi Chowdhury 
Date: Tuesday, April 25, 2017 at 3:56 PM
To: "Tzu-Li (Gordon) Tai" , "user@flink.apache.org" 

Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Gordon,

That was a typo, as I was trying to mask off the stream name.. I still had 
issues with using Latest as the initial stream position , I moved to using 
AT_TIMESTAMP to solve it, it works fine now.

Thanks so much for your response.

Sathi

 

From: "Tzu-Li (Gordon) Tai" 
Date: Sunday, April 23, 2017 at 3:32 PM
To: "user@flink.apache.org" 
Subject: Re: put record to kinesis and then trying consume using flink connector

 

Hi Sathi,

 

Here, in the producer-side log, it says:

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes :162810 partition key 
:fb043b73-1d9d-447a-a593-b9c1bf3d4974-1492915604228 ShardId: 
shardId-Sequence 
number49572539577762637793132019619873654976833479400677703682 Stream 
Name:mystream

The stream the record was inserted into is “mystream”.

However,

    DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));
you seem to be consuming from “myStream”.

Could the capital “S” be the issue?

 

Cheers,

Gordon

 

 

On 24 April 2017 at 12:33:28 AM, Sathi Chowdhury (sathi.chowdh...@elliemae.com) 
wrote:

Hi Flink Dev,

I thought something will work easily with flink and it is simple enough ,yer I 
am struggling to make it work.

I am using flink kinesis connector ..using 1.3-SNAPSHOT version.

 

Basically I am trying to bootstrap a stream with one event pushed into it as a 
warmup inside flink job’s main method and I use aws kinesis client to simply 
putrecord into a given stream.

My expectation is that now if I addSource to a kinesis stream the data stream 
will consume the event I pushed.

 

 

 

//This is the method that pushes to the kinesis Stream “mystream”

    publishToKinesis(“mystream”,regionName,data) ;

 

 

    Properties consumerConfig = new Properties();
    consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
    
consumerConfig.setProperty(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION,
 ConsumerConfigConstants.InitialPosition.LATEST.toString());
    
consumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"AUTO");

    final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(new 
Configuration(), false);
    cluster.start();
    ObjectMapper mapper = new ObjectMapper();
    final StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment(
    "localhost", cluster.getLeaderRPCPort());

    DataStream outputStream = see.addSource(new 
FlinkKinesisConsumer<>("myStream", new MyDeserializerSchema(), consumerConfig));


    for (Iterator it = DataStreamUtils.collect(outputStream); 
it.hasNext(); ) {
    String actualOut = it.next();
    ObjectNode actualOutNode = (ObjectNode) 
mapper.readTree(actualOut);
    //then I do want to  either print it or do some further 
validation etc.      
   }
 

 

……..

 

Not sure why the record that I published , FlinkKinesisConsumer is not able to 
react to it, it keeps waiting for it…at the step it.next();

 

 

I print out the SequenceNumber I put the record at

2017-04-22 19:46:53,608 INFO  [main] DLKnesisPublisher: Successfully published 
record, of bytes 

Re: REST API call in stream transformation

2017-04-27 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

Generally, for asynchronous operations to enrich (or in your case, fetching the 
algorithm for the actual transformation of the data), you’ll want to look at 
Flink’s Async I/O [1].

For your second question, I can see it as a stateful `FlatMapFunction` that 
keeps the seen results as managed state. Once all results are seen (i.e. the 
last result arrives at the operator), you join them and emit your final result 
further downstream.

Does this help with the use case you have in mind?

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html


On 27 April 2017 at 5:29:05 PM, G.S.Vijay Raajaa (gsvijayraa...@gmail.com) 
wrote:

HI,

I have just started to explore Flink and have couple of questions. I  am 
wondering if its possible to call a rest endpoint asynchronously and pipe the 
response to the next state of my transformation on the stream. The idea is such 
that after charging my data in a predefined time window, I would like to apply 
some algorithm/transformation on the window of data external to Flink. The 
algos have been exposed as REST endpoints.

My seconds question is an extension to the previous one, if i need to transform 
my window data by applying three different algorithms in parallel exposed via 
subsequent rest endpoints, how do wait unless the individual rest endpoints 
respond back . Potentially I need to join the results of the three algorithms 
before I call the sink.

Regards,
Vijay Raajaa 

Re: Regarding exception relating to FlinkKafkaConsumer09

2017-04-27 Thread Tzu-Li (Gordon) Tai
Hi!

The `PropertiesUtil.getBoolean` currently only exists in `1.3-SNAPSHOT`. The 
method was added along with one of the Kafka consumer changes recently.

Generally, you should always use matching versions of the Flink installation 
and the library, otherwise these kind of errors can always be expected.

Cheers,
Gordon


On 24 April 2017 at 11:56:44 PM, Abdul Salam Shaikh 
(abd.salam.sha...@gmail.com) wrote:

Hi, 

I am using 1.2-SNAPSHOT of Apache Flink and 1.3-SNAPSHOT of 
flink-connector-kafka-0.9_2.11. 

It was executing without any errors before but it is giving the following 
exception at the moment: 

​java.lang.NoSuchMethodError: 
org.apache.flink.util.PropertiesUtil.getBoolean(Ljava/util/Properties;Ljava/lang/String;Z)Z
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.getIsAutoCommitEnabled(FlinkKafkaConsumer09.java:235)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:330)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:157)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:383)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:259)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
at java.lang.Thread.run(Thread.java:745)

Has anything change recently on this front ? 

Thanks in anticipation!​





Re: Flink first project

2017-04-27 Thread Tzu-Li (Gordon) Tai
Hi Georg,

Simply from the aspect of a Flink source that listens to a REST endpoint for 
input data, there should be quite a variety of options to do that. The Akka 
streaming source from Bahir should also serve this purpose well. It would also 
be quite straightforward to implement one yourself.

On the other hand, what Jörn was suggesting was that you would want to first 
persist the incoming data from the REST endpoint to a repayable storage / 
queue, and your Flink job reads from that replayable storage / queue.
The reason for this is that Flink’s checkpointing mechanism for exactly-once 
guarantee relies on a replayable source (see [1]), and since a REST endpoint is 
not replayable, you’ll not be able to benefit from the fault-tolerance 
guarantees provided by Flink. The most popular source used with Flink for 
exactly-once, currently, is Kafka [2]. The only extra latency compared to just 
fetching REST endpoint, in this setup, is writing to the intermediate Kafka 
topic.

Of course, if you’re just testing around and just getting to know Flink, this 
setup isn’t necessary.
You can just start off with a source such as the Flink Akka connector in Bahir, 
and start writing your first Flink job right away :)

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/guarantees.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

On 24 April 2017 at 4:02:14 PM, Georg Heiler (georg.kf.hei...@gmail.com) wrote:

Wouldn't adding flume -> Kafka -> flink also introduce additional latency?

Georg Heiler  schrieb am So., 23. Apr. 2017 um 20:23 
Uhr:
So you would suggest flume over a custom akka-source from bahir?

Jörn Franke  schrieb am So., 23. Apr. 2017 um 18:59 Uhr:
I would use flume to import these sources to HDFS and then use flink or Hadoop 
or whatever to process them. While it is possible to do it in flink, you do not 
want that your processing fails because the web service is not available etc.
Via flume which is suitable for this kind of tasks it is more controlled and 
reliable.

On 23. Apr 2017, at 18:02, Georg Heiler  wrote:

New to flink I would like to do a small project to get a better feeling for 
flink. I am thinking of getting some stats from several REST api (i.e. Bitcoin 
course values from different exchanges) and comparing prices over different 
exchanges in real time.

Are there already some REST api sources for flink as a sample to get started 
implementing a custom REST source?

I was thinking about using https://github.com/timmolter/XChange to connect to 
several exchanges. E.g. to make a single api call by hand would look similar to

val currencyPair = new CurrencyPair(Currency.XMR, Currency.BTC)
  CertHelper.trustAllCerts()
  val poloniex = 
ExchangeFactory.INSTANCE.createExchange(classOf[PoloniexExchange].getName)
  val dataService = poloniex.getMarketDataService

  generic(dataService)
  raw(dataService.asInstanceOf[PoloniexMarketDataServiceRaw])
System.out.println(dataService.getTicker(currencyPair))

How would be a proper way to make this available as a flink source? I have seen 
https://github.com/apache/flink/blob/master/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
 but new to flink am a bit unsure how to proceed.

Regards,
Georg

Re: Join two kafka topics to do CEP

2017-04-27 Thread Tzu-Li (Gordon) Tai
Hi,

Here is my test but it does not work as data arrives i have to re-run, can 
anyone help me please ? 
I think you meant to sent some code snippet? Either way, some code snippet 
would probably help in understanding what you’re trying to achieve :)

You mentioned "re-run and no data”, so one thing that I could probably point 
out now: the Kafka consumer will commit offsets back to Kafka / ZK for the 
consumer group (“group.id”) you’re currently using. So, if in your tests you 
simply restarting the job, make sure you’re using different consumer groups if 
you want to read previous data in the topics.

Cheers,
Gordon


On 28 April 2017 at 12:20:17 AM, tarek26 (tarek.khal.leta...@gmail.com) wrote:

I want to do join between two kafka topics (Data, Rules) in one Datastream.  

In fact the two datastream must have the same id to make the join.  
Event are the data coming from the sensors  
Rules contains the rules that we will check with CEP  
Here is my test but it does not work as data arrives i have to re-run, can  
anyone help me please ?  





--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Join-two-kafka-topics-to-do-CEP-tp12865.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


[ANNOUNCE] Stateful Functions Docker images are now hosted on Dockerhub at apache/flink-statefun

2021-01-18 Thread Tzu-Li (Gordon) Tai
Hi,

We have created an "apache/flink-statefun" Dockerhub repository managed by
the Flink PMC, at:
https://hub.docker.com/r/apache/flink-statefun

The images for the latest stable StateFun release, 2.2.2, have already been
pushed there.
Going forward, it will be part of the release process to make official
images of newer release versions available there as well.

Cheers,
Gordon


Re: Stateful Functions - accessing the state aside of normal processing

2021-01-27 Thread Tzu-Li (Gordon) Tai
Hi Stephan,

Great to hear about your experience with StateFun so far!

I think what you are looking for is a way to read StateFun checkpoints,
which are basically an immutable consistent point-in-time snapshot of all
the states across all your functions, and run some computation or simply to
explore the state values.
StateFun checkpoints are essentially adopted from Flink, so you can find
more detail about that here [1].

Currently, StateFun does provide a means for state "bootstrapping": running
a batch offline job to write and compose a StateFun checkpoint [2].
What is still missing is the "reading / analysis" side of things, to do
exactly what you described: running a separate batch offline job for
reading and processing an existing StateFun checkpoint.

Before we dive into details on how that may look like, do you think that is
what you would need?

Although I don't think we would be able to support such a feature yet since
we're currently focused on reworking the SDKs and request-reply protocol,
in any case it would be interesting to discuss if this feature would be
important for multiple users already.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html#checkpointing
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/deployment-and-operations/state-bootstrap.html

On Wed, Jan 27, 2021 at 11:41 PM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> We are trying to use Statefuns for our tool and it seems to be a good fit.
> I already adopted it and it works quite well. However, we have millions of
> different states (all the same FunctionType but different ids) and each
> state consists of several @Persisted values (values and tables). We want to
> build an administration tool for examining the crowd of states (count,
> histogram, etc.) and each state in detail (the persisted-tables and
> -values).
>
>
>
> Additionally we need some kind of dig-down functionality for finding those
> individual states. For example some of those persisted values can be used
> to categorize the crowd of states.
>
>
>
> My question now is how to achieve this. Is there a way to browse and
> examine statefuns in a read-only fashion (their ids, their persisted
> values)? How can one achieve this without duplicating status in e.g. a
> relational database?
>
>
>
> Thanks,
>
> Stephan
>
>
>
> PS: I have another questions but I will send them in separate mails to
> avoid mixing up topics.
>


Re: [Stateful Functions] Problems with Protobuf Versions

2021-02-01 Thread Tzu-Li (Gordon) Tai
Hi,

This hints an incompatible Protobuf generated class by the protoc compiler,
and the runtime dependency used by the code.

Could you try to make sure the `protoc` compiler version matches the
Protobuf version in your code?

Cheers,
Gordon

On Fri, Jan 29, 2021 at 6:07 AM Jan Brusch 
wrote:

> Hi,
>
> I have a bit of a strange problem: I can't get a Statefun Application to
> Compile or Run (Depending on the exact Protobuf version) with a Protobuf
> version newer than 3.3.0. I have had this problem over multiple project
> setups and multiple versions of Flink Statefun with Java8.
>
> Protobuf 3.3.0 works fine and all, but it does seem a bit odd...
>
>
> The most common error behaviour is a successful maven build and the
> following Runtime Error on Startup:
>
> java.lang.NoClassDefFoundError:
> com/google/protobuf/GeneratedMessageV3$UnusedPrivateParameter
>
>
> Does anyone else have this Problem or found a solution for this in the
> past?
>
>
> Best regards
>
> Jan
>
> 
>
>
>
> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99
> https://www.neuland-bfi.de
>
> https://twitter.com/neuland
> https://facebook.com/neulandbfi
> https://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>


Re: Question a possible use can for Iterative Streams.

2021-02-02 Thread Tzu-Li (Gordon) Tai
Hi Marco,

In the ideal setup, enrichment data existing in external databases is
bootstrapped into the streaming job via Flink's State Processor API, and any
follow-up changes to the enrichment data is streamed into the job as a
second union input on the enrichment operator.
For this solution to scale, lookups to the enrichment data needs to be by
the same key as the input data, i.e. the enrichment data is co-partitioned
with the input data stream.

I assume you've already thought about whether or not this would work for
your case, as it's a common setup for streaming enrichment.

Otherwise, I believe your brainstorming is heading in the right direction,
in the case that remote database lookups + local caching in state is a must.
I'm personally not familiar with the iterative streams in Flink, but in
general I think it is currently discouraged to use it.

On the other hand, I think using Stateful Function's [1] programing
abstraction might work here, as it allows arbitrary messaging between
functions and cyclic dataflows.
There's also an SDK that allows you to embed StateFun functions within a
Flink DataStream job [2].

Very briefly, the way you would model this database cache hit / remote
lookup is by implementing a function, e.g. called DatabaseCache.
The function would expect message types of Lookup(lookupKey), and replies
with a response of Result(lookupKey, value). The abstraction allows you, for
on incoming message, to register state (similar to vanilla Flink), as well
as register async operations with which you'll use to perform remote
database lookups in case of cache / state miss. It also provides means for
"timers" in the form of delayed messages being sent to itself, if you need
some mechanism for cache invalidation.

Hope this provides some direction for you to think about!

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/flink-datastream.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question on Flink and Rest API

2021-02-02 Thread Tzu-Li (Gordon) Tai
Hi,

There is no out-of-box Flink source/sink connector for this, but it isn't
unheard of that users have implemented something to support what you
outlined.

One way to possibly achieve this is: in terms of a Flink streaming job
graph, what you would need to do is co-locate the source (which exposes the
endpoint and maintains a pool of open client connections mapped by request
ID), and the sink operators (which receives processed results with the
original request IDs attached, and is in charge for replying to the original
requests). The open client connections need to be process-wide accessible
(e.g. via a static reference), so that when a co-located sink operator
receives a result, it can directly fetch the corresponding client connection
and return a response.

The specifics are of course a bit more evolved; probably need some digging
around previous Flink Forward conference talks to get a better picture.
Hopefully this gives you a starting point to think about.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Statefun: cancel "sendAfter"

2021-02-02 Thread Tzu-Li (Gordon) Tai
Hi,

You are right, currently StateFun does not support deleting a scheduled
delayed message.

StateFun supports delayed messages by building on top of two Flink
constructs: 1) registering processing time timers, and 2) buffering the
message payload to be sent in state.

The delayed messages are kept in the Flink state of the sending operator,
and timers are registered on the sending operator as well. So technically,
there doesn't seem to be a blocker for deleting a delayed message and its
associated timer, if it hasn't been sent yet.

Can you maybe open a JIRA ticket for this, so we have something that tracks
it?
Also cc'ing Igal, who might have more comments on whether supporting this
makes sense.

Cheers,
Gordon


On Wed, Feb 3, 2021 at 3:51 AM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> I think about using „sendAfter“ to implement some kind of timer
> functionality. I’m wondering if there is no possibility to cancel delayed
> sent message!
>
>
>
> In my use case it is possible that intermediate events make the delayed
> message obsolete. In some cases the statefun of that certain ID is cleared
> (clear all state variables) and does not exist anymore. In other cases the
> statefun of that ID still exists (and its state). In the latter case I
> could ignore the delayed message, but what about those statefun which do
> not exist anymore?
>
>
>
> Additionally there can be millions of delayed messages which I do not need
> any more and some delays are also hours, days or even months. I don’t want
> to pollute my state with this because it will inflate the size of my
> checkpoints.
>
>
>
> There are no hints in the docs (
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/java.html#sending-delayed-messages)
> how those situations are treated. I found in the Flink’s docs that timers
> of keyed processors can be deleted. As far as I know statefuns are based on
> those processors, so I hope that there is something about it. I hope
> someone can clarify what I can expect and how those situations are handled
> internally.
>
>
>
> Thanks,
>
> Stephan
>


Re: Statefun: cancel "sendAfter"

2021-02-05 Thread Tzu-Li (Gordon) Tai
Hi Stephan,

Thanks for providing the details of the use case! It does indeed sound like
being able to delete scheduled delayed messages would help here.

And yes, please do proceed with creating an issue. As for details on the
implementation, we can continue to discuss that on the JIRA.

Cheers,
Gordon

On Wed, Feb 3, 2021 at 3:43 PM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> thank you Gordon for clarification. My use-case is processing business
> events of customers. Those events are triggered by ourself or by the
> customer depending of what’s the current state of the ongoing customer’s
> business use-case. We need to monitor delayed/missing business events which
> belong to previous events. For example: the customer has to confirm
> something we did. Depending on what it is the confirmation has to be within
> hours, days or even months. If there is a delay we need to know. But if the
> customer confirms in time we want to cleanup to keep the state small.
>
>
>
> I dug a little bit into the code. May I create an issue to discuss my
> ideas?
>
>
>
> Cheers,
>
> Stephan
>
>
>
>
>
> *Von:* Tzu-Li (Gordon) Tai 
> *Gesendet:* Mittwoch, 3. Februar 2021 07:58
> *An:* Stephan Pelikan 
> *Cc:* user@flink.apache.org; Igal Shilman 
> *Betreff:* Re: Statefun: cancel "sendAfter"
>
>
>
> Hi,
>
> You are right, currently StateFun does not support deleting a scheduled
> delayed message.
>
> StateFun supports delayed messages by building on top of two Flink
> constructs: 1) registering processing time timers, and 2) buffering the
> message payload to be sent in state.
>
> The delayed messages are kept in the Flink state of the sending operator,
> and timers are registered on the sending operator as well. So technically,
> there doesn't seem to be a blocker for deleting a delayed message and its
> associated timer, if it hasn't been sent yet.
>
> Can you maybe open a JIRA ticket for this, so we have something that
> tracks it?
> Also cc'ing Igal, who might have more comments on whether supporting this
> makes sense.
>
> Cheers,
> Gordon
>
>
>
> On Wed, Feb 3, 2021 at 3:51 AM Stephan Pelikan 
> wrote:
>
> Hi,
>
>
>
> I think about using „sendAfter“ to implement some kind of timer
> functionality. I’m wondering if there is no possibility to cancel delayed
> sent message!
>
>
>
> In my use case it is possible that intermediate events make the delayed
> message obsolete. In some cases the statefun of that certain ID is cleared
> (clear all state variables) and does not exist anymore. In other cases the
> statefun of that ID still exists (and its state). In the latter case I
> could ignore the delayed message, but what about those statefun which do
> not exist anymore?
>
>
>
> Additionally there can be millions of delayed messages which I do not need
> any more and some delays are also hours, days or even months. I don’t want
> to pollute my state with this because it will inflate the size of my
> checkpoints.
>
>
>
> There are no hints in the docs (
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/java.html#sending-delayed-messages)
> how those situations are treated. I found in the Flink’s docs that timers
> of keyed processors can be deleted. As far as I know statefuns are based on
> those processors, so I hope that there is something about it. I hope
> someone can clarify what I can expect and how those situations are handled
> internally.
>
>
>
> Thanks,
>
> Stephan
>
>


Re: Flink 1.11.3 not able to restore with savepoint taken on Flink 1.9.3

2021-02-19 Thread Tzu-Li (Gordon) Tai
Hi,

I'm not aware of any breaking changes in the savepoint formats from 1.9.3 to
1.11.3.

Let's first try to rule out any obvious causes of this:
- Were any data types / classes that were used in state changed across the
restores? Remember that keys types are also written as part of state
snapshots.
- Did you register any Kryo types in the 1.9.3 execution, had changed those
configuration across the restores?
- Was unaligned checkpointing enabled in the 1.11.3 restore?

As of now it's a bit hard to debug this with just an EOFException, as the
corrupted read could have happened anywhere before that point. If it's
possible to reproduce a minimal job of yours that has the same restore
behaviour, that could also help a lot.

Thanks,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Statefun] Dynamic behavior

2021-02-21 Thread Tzu-Li (Gordon) Tai
Hi,

FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
primitive in StateFun:
https://issues.apache.org/jira/browse/FLINK-16319

This is probably what you are looking for. And I do agree, in the case that
the control stream (which updates the application logic) is high volume,
redeploying functions may not work well.

I don't think there really is a "recommended" way of doing the "broadcast
control stream, join with main stream" pattern with StateFun at the moment,
at least without FLINK-16319.
On the other hand, it could be possible to use stateful functions to
implement a pub-sub model in user space for the time being. I've actually
left some ideas for implementing that in the comments of FLINK-16319.

Cheers,
Gordon


On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo  wrote:

> Hi everyone,
>
> What is the recommended way of achieving the equivalent of a broadcast in
> Flink when using Stateful Functions?
>
> For instance, assume we are implementing something similar to Flink's
> demo fraud detection
>  but
> in Stateful Functions - how can one dynamically update the application's
> logic then?
> There was a similar question in this mailing list in the past where it was 
> recommended
> moving the dynamic logic to a remote function
> 
>  so
> that one could achieve that by deploying a new container. I think that's
> not very realistic as updates might happen with a frequency that's not
> compatible with that approach (e.g., sticking to the fraud detection
> example, updating fraud detection rules every hour is not unusual), nor
> should one be deploying a new container when data (not code) changes.
>
> Is there a way of, for example, modifying FunctionProviders
> 
> on the fly?
>
> Thanks,
> Miguel
>


Re: Run the code in the UI

2021-02-21 Thread Tzu-Li (Gordon) Tai
Hi,

Could you re-elaborate what exactly you mean?

If you wish to run a Flink job within the IDE, but also have the web UI
running for it, you can use
`StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(Configuration)`
to create the execution environment.
The default port 8081 will be used unless specified via `rest.port` in the
configuration.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Statefun TTL

2021-02-24 Thread Tzu-Li (Gordon) Tai
Hi Timothy,

Starting from StateFun 2.2.x, in the module.yaml file, you can set for each
individual state of a function an "expireMode" field, which values can be
either "after-invoke" or "after-write". For example:

```
- function:
meta:
  ...
spec:
  states:
- name: state-1
  expireMode: after-write
  expireAfter: 1min
- name: state-2
  expireMode: after-invoke
  expireAfter: 5sec
```

In earlier versions, expireMode can not be individually set for each state.
This is more flexible with 2.2.x.

As a side note which is somewhat related, all state related configurations
will be removed from the module.yaml, instead to be defined by the language
SDKs starting from StateFun 3.0.
This opens up even more flexibility, such as zero-downtime upgrades of
remote functions which allows adding / removing state declarations without
restarting the StateFun cluster.
We're planning to reach out to the language SDK developers we know of
(which includes you for the Haskell SDK ;) ) soon on a briefing of this
change, as there is a change in the remote invocation protocol and will
require existing SDKs to be updated in order to work with StateFun 3.0.

Cheers,
Gordon

On Wed, Feb 24, 2021 at 11:00 PM Timothy Bess  wrote:

> Hey,
>
> I noticed that the Flink Statefun 2.1.0 release notes had this snippet
> with regards to TTL:
>
> Note: The state expiration mode for remote functions is currently
>> restricted to AFTER_READ_AND_WRITE, and the actual TTL being set is the
>> longest duration across all registered state, not for each individual state
>> entry. This is planned to be improved in upcoming releases (FLINK-17954).
>>
>
> I noticed that the Ticket and PR for this have been closed with a
> reference to commit "289c30e8cdb54d2504ee47a57858a1d179f9a540". Does this
> mean that if I upgrade to 2.2.2 and set an expiration in my modules.yaml it
> is now "per function id" rather than across instances of said function?
>
> Thanks,
>
> Tim
>


Re: Flink Statefun TTL

2021-02-24 Thread Tzu-Li (Gordon) Tai
On Thu, Feb 25, 2021 at 12:06 PM Timothy Bess  wrote:

> Hi Gordon,
>
> Ah so when it said "all registered state" that means all state keys
> defined in the "module.yaml", not all state for all function instances. So
> the expiration has always been _per_ instance then and not across all
> instances of a function.
>

Exactly! Expiration happens individually for each function instance per
declared state.


>
> Thanks for the heads up, that sounds like a good change! I definitely like
> the idea of putting more configuration into the SDK so that there's not two
> sources that have to be kept up to date. Would be neat if eventually the
> SDK just hosts some "/spec" endpoint that serves a list of functions and
> all their configuration options to Statefun on boot.
>

> Btw, I ended up also making a Scala replica of my Haskell library to use
> at work (some of my examples in the microsite are a bit out of date, need
> to revisit that):
> https://github.com/BlueChipFinancial/flink-statefun4s
>
> I know it seems weird to not use an embedded function, but it keeps us
> from having to deal with mismatched Scala versions since Flink is still on
> 2.12, and generally reduces friction using stuff in the Scala Cats
> ecosystem.
>

Really cool to hear about your efforts on a Scala SDK!

I would not say it is weird to implement a Scala SDK for remote functions.
In fact, with the changes upcoming in 3.0, the community is doubling down
on remote as the primary deployment mode for functions, and would like to
have a wider array of supported language SDKs. There's actually a remote
Java SDK that was just merged to master and to be released in 3.0 [1].

Cheers,
Gordon

[1] https://github.com/apache/flink-statefun/tree/master/statefun-sdk-java


> Thanks,
>
> Tim
>
> On Wed, Feb 24, 2021 at 11:49 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Timothy,
>>
>> Starting from StateFun 2.2.x, in the module.yaml file, you can set for
>> each individual state of a function an "expireMode" field, which values can
>> be either "after-invoke" or "after-write". For example:
>>
>> ```
>> - function:
>> meta:
>>   ...
>> spec:
>>   states:
>> - name: state-1
>>   expireMode: after-write
>>   expireAfter: 1min
>> - name: state-2
>>   expireMode: after-invoke
>>   expireAfter: 5sec
>> ```
>>
>> In earlier versions, expireMode can not be individually set for each
>> state. This is more flexible with 2.2.x.
>>
>> As a side note which is somewhat related, all state related
>> configurations will be removed from the module.yaml, instead to be defined
>> by the language SDKs starting from StateFun 3.0.
>> This opens up even more flexibility, such as zero-downtime upgrades of
>> remote functions which allows adding / removing state declarations without
>> restarting the StateFun cluster.
>> We're planning to reach out to the language SDK developers we know of
>> (which includes you for the Haskell SDK ;) ) soon on a briefing of this
>> change, as there is a change in the remote invocation protocol and will
>> require existing SDKs to be updated in order to work with StateFun 3.0.
>>
>> Cheers,
>> Gordon
>>
>> On Wed, Feb 24, 2021 at 11:00 PM Timothy Bess  wrote:
>>
>>> Hey,
>>>
>>> I noticed that the Flink Statefun 2.1.0 release notes had this snippet
>>> with regards to TTL:
>>>
>>> Note: The state expiration mode for remote functions is currently
>>>> restricted to AFTER_READ_AND_WRITE, and the actual TTL being set is the
>>>> longest duration across all registered state, not for each individual state
>>>> entry. This is planned to be improved in upcoming releases (FLINK-17954).
>>>>
>>>
>>> I noticed that the Ticket and PR for this have been closed with a
>>> reference to commit "289c30e8cdb54d2504ee47a57858a1d179f9a540". Does this
>>> mean that if I upgrade to 2.2.2 and set an expiration in my modules.yaml it
>>> is now "per function id" rather than across instances of said function?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>


Re: Job downgrade

2021-03-04 Thread Tzu-Li (Gordon) Tai
Hi Alexey,

Are you using the heap backend? If that's the case, then for whatever state
was registered at the time of a savepoint, Flink will attempt to restore it
to the heap backends.
This essentially means that state "B" will be read as well, that would
explain why Flink is trying to locate class B in the classpath.

For this scenario, class B needs to be in the classpath if you downgrade
back to version 1, with a savepoint taken with version 2 of the job.

- Gordon

On Thu, Mar 4, 2021 at 4:04 AM Alexey Trenikhun  wrote:

> If I copy class A into version 1+ it works. But it is the problem from CD
> perspective - I want to introduce feature which required new state: 1st I
> need make version 1+ with class B, but no other changes, then version 2 with
> class B and logic changes, upgrade job and if job doesn’t do what expected
> “rollback” to version 1+.
>
> --
> *From:* Piotr Nowojski 
> *Sent:* Wednesday, March 3, 2021 11:47:45 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Job downgrade
>
> Hi,
>
> I'm not sure what's the reason behind this. Probably classes are somehow
> attached to the state and this would explain why you are experiencing this
> issue. I've asked someone else from the community to chip in, but in the
> meantime, can not you just prepare a new "version 1" of the job, with just
> some empty `class B` on the class path? Or if this doesn't work, just copy
> the whole `class B` from version 2?
>
> Best,
> Piotrek
>
> sob., 27 lut 2021 o 19:10 Alexey Trenikhun  napisał(a):
>
> Hello,
> Let's have version 1 of my job uses keyed state with name "a" and type A,
> which some Avro generated class. Then I upgrade to version 2, which in
> addition uses keyed state "b" and type B (another concrete Avro generated
> class), I take savepoint with version 2 and decided to downgrade to version
> 1 and start with taken savepoint, can I do it? On one hand, version 1
> doesn't have state "b", but seems Flink still tries to create call
> restoreSerializer​ and it tries to read runtimeType (`class B`) which is
> not available in version 1
>
> Thanks,
> Alexey
>
>


Re: Job downgrade

2021-03-07 Thread Tzu-Li (Gordon) Tai
Hi Alexey,

Thanks for confirming.

Can you send me a copy of the exception stack trace? That could help me
pinpoint the exact issue.

Cheers,
Gordon

On Fri, Mar 5, 2021 at 2:02 PM Alexey Trenikhun  wrote:

> Hi Gordon,
> I was using RocksDB backend
> Alexey
>
> ------
> *From:* Tzu-Li (Gordon) Tai 
> *Sent:* Thursday, March 4, 2021 12:58:01 AM
> *To:* Alexey Trenikhun 
> *Cc:* Piotr Nowojski ; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: Job downgrade
>
> Hi Alexey,
>
> Are you using the heap backend? If that's the case, then for whatever
> state was registered at the time of a savepoint, Flink will attempt to
> restore it to the heap backends.
> This essentially means that state "B" will be read as well, that would
> explain why Flink is trying to locate class B in the classpath.
>
> For this scenario, class B needs to be in the classpath if you downgrade
> back to version 1, with a savepoint taken with version 2 of the job.
>
> - Gordon
>
> On Thu, Mar 4, 2021 at 4:04 AM Alexey Trenikhun  wrote:
>
> If I copy class A into version 1+ it works. But it is the problem from CD
> perspective - I want to introduce feature which required new state: 1st I
> need make version 1+ with class B, but no other changes, then version 2 with
> class B and logic changes, upgrade job and if job doesn’t do what expected
> “rollback” to version 1+.
>
> --
> *From:* Piotr Nowojski 
> *Sent:* Wednesday, March 3, 2021 11:47:45 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Job downgrade
>
> Hi,
>
> I'm not sure what's the reason behind this. Probably classes are somehow
> attached to the state and this would explain why you are experiencing this
> issue. I've asked someone else from the community to chip in, but in the
> meantime, can not you just prepare a new "version 1" of the job, with just
> some empty `class B` on the class path? Or if this doesn't work, just copy
> the whole `class B` from version 2?
>
> Best,
> Piotrek
>
> sob., 27 lut 2021 o 19:10 Alexey Trenikhun  napisał(a):
>
> Hello,
> Let's have version 1 of my job uses keyed state with name "a" and type A,
> which some Avro generated class. Then I upgrade to version 2, which in
> addition uses keyed state "b" and type B (another concrete Avro generated
> class), I take savepoint with version 2 and decided to downgrade to version
> 1 and start with taken savepoint, can I do it? On one hand, version 1
> doesn't have state "b", but seems Flink still tries to create call
> restoreSerializer​ and it tries to read runtimeType (`class B`) which is
> not available in version 1
>
> Thanks,
> Alexey
>
>


Re: Best practices for complex state manipulation

2021-03-10 Thread Tzu-Li (Gordon) Tai
Hi Dan,

For a deeper dive into state backends and how they manage state, or
performance critical aspects such as state serialization and choosing
appropriate state structures, I highly recommend starting from this webinar
done by my colleague Seth Weismann:
https://www.youtube.com/watch?v=9GF8Hwqzwnk.

Cheers,
Gordon

On Wed, Mar 10, 2021 at 1:58 AM Dan Hill  wrote:

> Hi!
>
> I'm working on a join setup that does fuzzy matching in case the client
> does not send enough parameters to join by a foreign key.  There's a few
> ways I can store the state.  I'm curious about best practices around this.
> I'm using rocksdb as the state storage.
>
> I was reading the code for IntervalJoin
> 
> and was a little shocked by the implementation.  It feels designed for very
> short join intervals.
>
> I read this set of pages
> 
> but I'm looking for one level deeper.  E.g. what are performance
> characteristics of different types of state crud operations with rocksdb?
> E.g. I could create extra MapState to act as an index.  When is this worth
> it?
>
>
>


Re: Relation between Two Phase Commit and Kafka's transaction aware producer

2021-03-10 Thread Tzu-Li (Gordon) Tai
Hi Kevin,

Perhaps the easiest way to answer your question, is to go through how the
exactly-once FlinkKafkaProducer using a 2PC implementation on top of
Flink's checkpointing mechanism.

The phases can be broken down as follows (simplified assuming max 1
concurrent checkpoint and that checkpoint completion notifications are
never late):

   1. BEGIN_TXN: In between each Flink checkpoint, each FlinkKafkaProducer
   sink operator creates a new Kafka transaction. You can assume that on
   startup, a new Kafka transaction is created immediately for records that
   occur before the first checkpoint.
   2. PRE_COMMIT: Once a FlinkKafkaProducer sink operator receives Flink's
   checkpoint barrier, it flushes pending records to the current open
   transaction, and opens a new one for future records, which belongs to the
   next checkpoint and thus should be written to the next transaction. Once
   flushed, the sink operator acknowledges it has completed its checkpoint.
   3. COMMIT: Once all sinks acknowledge checkpoint completion, the Flink
   checkpoint is considered complete (containing state of all operators +
   consumer offsets). Once that happens, Flink notifies each sink operator of
   the completion, and only upon receiving this notification, can the sink
   operator commit the previous transaction.

There are some edge cases that is handled, e.g. a checkpoint is considered
complete, but before all sinks receive the completion notification and
commit their transactions, the job fails (that's why txn ids are written
into the checkpoint as well, to make sure all txns belonging to that
checkpoint is still eventually committed after restore).

The general takeaway is that each parallel sink operator can commit the
Kafka transactions only after all participants in the 2PC (i.e. all Flink
operators and sinks) acknowledge that they are ready to commit.
In Flink terms, the JM is the coordinator, and an operator / sink
completing their checkpoint is acknowledging that they are ready for
committing.

>From an end-to-end point of view, downstream consumers of the output Kafka
topic will not see records (assuming they are consuming in Kafka's
read.commited mode) until the upstream Flink application sink commits the
open Kafka transactions.
This boils down to, the read latency for downstream applications is at
least the upstream Flink app's checkpoint interval.

Hope this helps!

Cheers,
Gordon

On Wed, Mar 10, 2021 at 5:20 PM Kevin Kwon  wrote:

> Hi team, I just have a bit of confusion where Two Phase Commit and Kafka's
> transaction aware producer using transaction.id and enable.autocommit
> plays together
>
> what I understand of Flink checkpoint (correct me if I'm wrong) is that it
> saves the transaction ID as well as the consumer's commit offsets, so when
> application fails and restarts, it will reprocess everything from the last
> checkpoint and data will be idempotently processed in the Kafka side.
> (exactly-once processing rather than exactly-once delivery)
>
> the question is where does 2 phase commit play a role here?
>


Re: Extracting state keys for a very large RocksDB savepoint

2021-03-14 Thread Tzu-Li (Gordon) Tai
Hi Andrey,

Perhaps the functionality you described is worth adding to the State
Processor API.
Your observation on how the library currently works is correct; basically it
tries to restore the state backends as is.

In you current implementation, do you see it worthwhile to try to add this?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Statefun] Interaction Protocol for Statefun

2021-03-15 Thread Tzu-Li (Gordon) Tai
Hi,

Interesting idea! Just some initial thoughts and questions, maybe others can
chime in as well.

In general I think the idea of supporting more high-level protocols on top
of the existing StateFun messaging primitives is good.

For example, what probably could be categorized under this effort is, we've
already been thinking about a pub/sub / broadcast / fan-out implementation
with StateFun [1].

As for the DSL specification language for protocols, that definitely sounds
like a stretch goal for the near future.

I'm curious, if you were to start with adding support for one interaction
protocol, which one would you start with and would find most useful for
users?

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-16319



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: uniqueness of name when constructing a StateDescriptor

2021-03-15 Thread Tzu-Li (Gordon) Tai
Hi,

The scope is per individual operator, i.e. a single KeyedProcessFunction
instance cannot have multiple registered state with the same name.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-15 Thread Tzu-Li (Gordon) Tai
Hi,

Could you provide info on the Flink version used?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Relation between Two Phase Commit and Kafka's transaction aware producer

2021-03-15 Thread Tzu-Li (Gordon) Tai
+ user@f.a.o  (adding the conversation back to the user mailing list)

On Fri, Mar 12, 2021 at 6:06 AM Kevin Kwon  wrote:

> Thanks Tzu-Li
>
> Interesting algorithm. Is consumer offset also committed to Kafka at the
> last COMMIT stage after the checkpoint has completed?
>

Flink does commit the offsets back to Kafka when sources perform
checkpoints, but those offsets are not used for fault-tolerance and restore
by Flink. They are purely used as a means for exposing consumption progress.
Flink only respects the offsets being written to its checkpoints. Those
offsets are essentially the state of the FlinkKafkaConsumer sources, and
are written to checkpoints by the sources.
As previously explained, the last COMMIT stage comes after that, i.e. after
all Flink operators complete their state checkpoint.


>
> Also does the coordinator (JM) write any data in write-ahead-log before
> sending out commit messages to all Flink entities? I'm concerned when JM
> succeeds sending a commit message to some entities but fails to others and
> dies.
>

No. And indeed, while Flink guarantees that checkpoint complete
notifications will be eventually received by all listening operators (e.g.
the Kafka sinks), the job can ideed fail when only partially some sinks
have received the notification (and commits).
The way Flink handles the issue you mentioned, is that all pending-commit
transaction ids will be part of the sink's state.
When a sink checkpoints its state (during the pre-commit phase), it writes
all pending-commit transaction ids. If for any reason the job fails and
failover is triggered, the restored lastest complete checkpoint will
contain those pending-commit transaction ids.
Then, those pending transactions will be attempted to be committed.
So, in the end, you can see this as the transactions will all eventually be
successfully committed, even in the event of a failure.


>
> Finally, seems 2PC is implemented in order to make 3 entities, Kafka
> producer data / Kafka consumer offset / Flink Checkpoint to be in
> consistent state. However, since checkpoint is an ever increasing state
> like ledger that prunes the previous state as it goes, isn't
> write-ahead-log in the sink side enough to handle the exactly-once
> processing guarantee? what I mean is checking the state between WHL and the
> current checkpoint status and conservatively rollback to previous
> checkpoint and replay all data
>
> On Thu, Mar 11, 2021 at 7:44 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Kevin,
>>
>> Perhaps the easiest way to answer your question, is to go through how the
>> exactly-once FlinkKafkaProducer using a 2PC implementation on top of
>> Flink's checkpointing mechanism.
>>
>> The phases can be broken down as follows (simplified assuming max 1
>> concurrent checkpoint and that checkpoint completion notifications are
>> never late):
>>
>>1. BEGIN_TXN: In between each Flink checkpoint, each
>>FlinkKafkaProducer sink operator creates a new Kafka transaction. You can
>>assume that on startup, a new Kafka transaction is created immediately for
>>records that occur before the first checkpoint.
>>2. PRE_COMMIT: Once a FlinkKafkaProducer sink operator receives
>>Flink's checkpoint barrier, it flushes pending records to the current open
>>transaction, and opens a new one for future records, which belongs to the
>>next checkpoint and thus should be written to the next transaction. Once
>>flushed, the sink operator acknowledges it has completed its checkpoint.
>>3. COMMIT: Once all sinks acknowledge checkpoint completion, the
>>Flink checkpoint is considered complete (containing state of all operators
>>+ consumer offsets). Once that happens, Flink notifies each sink operator
>>of the completion, and only upon receiving this notification, can the sink
>>operator commit the previous transaction.
>>
>> There are some edge cases that is handled, e.g. a checkpoint is
>> considered complete, but before all sinks receive the completion
>> notification and commit their transactions, the job fails (that's why txn
>> ids are written into the checkpoint as well, to make sure all txns
>> belonging to that checkpoint is still eventually committed after restore).
>>
>> The general takeaway is that each parallel sink operator can commit the
>> Kafka transactions only after all participants in the 2PC (i.e. all Flink
>> operators and sinks) acknowledge that they are ready to commit.
>> In Flink terms, the JM is the coordinator, and an operator / sink
>> completing their checkpoint is acknowledging that they are ready for
>> committing.
>>
>> From an end-to-end point of view

Re: StateFun examples in scala

2021-03-30 Thread Tzu-Li (Gordon) Tai
Hi Jose!

For Scala, we would suggest to wait until StateFun 3.0.0 is released, which
is actually happening very soon (likely within 1-2 weeks) as there is an
ongoing release candidate vote [1].

The reason for this is that version 3.0 adds a remote SDK for Java, which
you should be able to use with Scala (or any other JVM language) seamlessly.
With StateFun <= 2.x, you only have the option to use embedded functions if
you'd like to use Java / Scala, which is a bit problematic and we can't
guarantee that it'll work for all Scala versions.

You can take a look at a preview of the new Java SDK here [2], this is a
nice tutorial that runs you through all the SDK fundamentals.
Note that this is not completely finalized yet, as the release voting is
still ongoing.

Would be great if you want to try the release candidate out already, or
have some feedback for the new SDK!

Cheers,
Gordon

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html
[2]
https://github.com/apache/flink-statefun-playground/tree/dev/java/showcase

On Tue, Mar 30, 2021 at 8:21 PM Till Rohrmann  wrote:

> Hi Jose,
>
> I am pulling in Gordon who will be able to help you with your question.
>
> Personally, I am not aware of any limitations which prohibit the usage of
> Scala.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 11:55 AM jose farfan  wrote:
>
>> Hi
>>
>> I am trying to find some examples written in scala of StateFun.
>>
>> But, I cannot find nothing.
>> My questions is:
>>
>>1. is there any problem to use statefun with Scala
>>2. is there any place with examples written in scala.
>>
>> BR
>> Jose
>>
>


Re: Support for sending generic class

2021-03-30 Thread Tzu-Li (Gordon) Tai
Hi Le,

Thanks for reaching out with this question! It's actually a good segue to
allow me to introduce you to StateFun 3.0.0 :)

StateFun 3.0+ comes with a new type system that would eliminate this
hassle. You can take a sneak peek here [1].
This is part 1 of a series of tutorials on fundamentals on the upcoming new
Java SDK (you can find tutorials for other languages there as well), and it
guides you through a bit on the new type system.

For your specific case, what you would do is implement a `Type` for your
Tuple3 messages. The `Type` contains information including a typename to
identify the data type, and a serializer for de-/serializing the data.
This `Type` can then be used when creating messages to be sent to other
functions and egresses, or used as the type specification for persisted
state values.

If you're not in production usage already, I would highly suggest waiting a
bit for StateFun 3.0.0 as it is just around the corner with an ongoing
release candidate vote [2] and is expected to be available within 1-2 weeks.

Let me know if this helps!

Cheers,
Gordon

[1]
https://github.com/apache/flink-statefun-playground/blob/dev/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html

On Tue, Mar 30, 2021 at 8:17 PM Till Rohrmann  wrote:

> Hi Le,
>
> I am pulling in Gordon who might be able to help you with your question.
>
> Looking at the interface Context, it looks that you cannot easily specify
> a TypeHint for the message you want to send. Hence, I guess that you
> explicitly need to register these types.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 8:20 AM Le Xu  wrote:
>
>> Hello!
>>
>> I'm trying to figure out whether Flink Statefun supports sending object
>> with class that has generic parameter types (and potentially nested types).
>> For example, I send a message that looks like this:
>>
>> context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject,
>> listOfLongObject, Long));
>>
>> And obviously I'm getting complaints like this:
>>
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract
>> TypeInformation from Class alone, because generic parameters are missing.
>> Please use TypeInformation.of(TypeHint) instead, or another equivalent
>> method in the API that accepts a TypeHint instead of a Class. For example
>> for a Tuple2 pass a 'new TypeHint>(){}'.
>> at
>> org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
>> at
>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>> at
>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
>> at
>> org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
>> at
>> org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
>> at
>> org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
>> at
>> org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
>> at
>> org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
>> at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
>> at
>> benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
>>
>>
>> Is there any API function that statefun support for parameterized class
>> like this or does the user function need to handle the serialization
>> process -- or is there anyway to quickly modify statefun message interface
>> to support this functionality.
>>
>> Thanks!
>>
>> Le
>>
>>
>>
>>
>>
>>


Re: How to specific key serializer

2021-03-31 Thread Tzu-Li (Gordon) Tai
Hi CZ,

The issue here is that the Scala DataStream API uses Scala macros to decide
the serializer to be used. Since that recognizes Scala case classes, the
CaseClassSerializer will be used.
However, in the State Processor API, those Scala macros do not come into
play, and therefore it directly goes to Flink's type extraction for Java
classes, which recognizes this as a Avro generated class.
In general, currently the State Processor API doesn't support savepoints
written by Scala DataStream jobs that well.

You can try using TypeInfo annotations to specify a TypeInformationFactory
for your key class [1].
This allows you to "plug-in" the TypeInformation extracted by Flink for a
given class. In that custom TypeInformation, you should let it return the
correct serializer.

Best,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory

On Mon, Mar 29, 2021 at 2:42 PM ChangZhuo Chen (陳昌倬) 
wrote:

> Hi,
>
> Currently we use sbt-avrohugger [0] to generate key class for keyed
> state.  The key class generated by sbt-avrohugger is both case class,
> and AVRO specific record. However, in the following scenarons, Flink
> uses different serializers:
>
>
> * In streaming application, Flink uses CaseClassSerializer for key
>   class.
> * In state processor API application, Flink uses AvroSerializer for key
>   class.
>
>
> Since they use different serializers for key, they are not compatible.
> Is there any way to specific key serializer so that both applications
> use the same serializer?
>
>
>
> [0] https://github.com/julianpeeters/sbt-avrohugger
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Why is Hive dependency flink-sql-connector-hive not available on Maven Central?

2021-04-06 Thread Tzu-Li (Gordon) Tai
Hi,

I'm pulling in Rui Li (cc'ed) who might be able to help you here as he
actively maintains the hive connectors.

Cheers,
Gordon


On Fri, Apr 2, 2021 at 11:36 AM Yik San Chan 
wrote:

> The question is cross-posted in StackOverflow
> https://stackoverflow.com/questions/66914119/flink-why-is-hive-dependency-flink-sql-connector-hive-not-available-on-maven-ce
>
> According to [Flink SQL Hive: Using bundled hive jar](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
> ):
>
> > The following tables list all available bundled hive jars. You can pick
> one to the /lib/ directory in Flink distribution.
> > - flink-sql-connector-hive-1.2.2 (download link)
> > - flink-sql-connector-hive-2.2.0 (download link)
> > ...
>
> However, these dependencies are not available from Maven central. As a
> work around, I use [user defined dependencies](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#user-defined-dependencies),
> but this is not recommended:
>
> > the recommended way to add dependency is to use a bundled jar. Separate
> jars should be used only if bundled jars don’t meet your needs.
>
> I wonder why the bundle jars are not available in Maven central?
>
> Follow-up: Since they are not available from Maven central, I wonder how
> to include them in pom.xml in order to run `mvn package`?
>
> Thanks!
>


Re: How to know if task-local recovery kicked in for some nodes?

2021-04-06 Thread Tzu-Li (Gordon) Tai
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal  wrote:

> Hello,
>
> We are experimenting with task local recovery and I wanted to know whether
> there is a way to validate that some tasks of the job recovered from the
> local state rather than the remote state.
>
> We've currently set this up to have 2 Task Managers with 2 slots each, and
> we run a job with parallelism 4. To simulate failure, we kill one of the
> Task Manager pods (we run on Kubernetes). I want to see if the local state
> of the other Task Manager was used or not. I do understand that the state
> for the killed Task Manager will need to be fetched from the checkpoint.
>
> Also, do you have any suggestions on how to test such failure scenarios in
> a better way?
>
> Thanks,
> Sonam
>


[ANNOUNCE] Apache Flink Stateful Functions 3.0.0 released

2021-04-15 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions (StateFun) 3.0.0.

StateFun is a cross-platform stack for building Stateful Serverless
applications, making it radically simpler to develop scalable, consistent,
and elastic distributed applications.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2021/04/15/release-statefun-3.0.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for StateFun can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for StateFun published to the PyPI index can be found at:
https://pypi.org/project/apache-flink-statefun/

Official Docker images for StateFun are published to Docker Hub:
https://hub.docker.com/r/apache/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348822

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: Stateful Functions, Kinesis, and ConsumerConfigConstants

2021-04-29 Thread Tzu-Li (Gordon) Tai
Hi Ammon,

Unfortunately you're right. I think the Flink Kinesis Consumer specific
configs, e.g. keys in the ConsumerConfigConstants class, were overlooked in
the initial design.

One way to workaround this is to use the `SourceFunctionSpec` [1]. Using
that spec, you can use any Flink SourceFunction (e.g. a
FlinkKinesisConsumer) as the ingress.
Simply instantiate a `SourceFunctionSpec` with the desired ID, and provide
a custom FlinkKinesisConsumer that you create directly (which should allow
you to provide the ConsumerConfigConstants).

As a side note, I've created this JIRA to address the issue you
encountered, as I believe this should be supported in the native StateFun
Kinesis ingress [2].

Cheers,
Gordon

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/datastream/SourceFunctionSpec.java
[2] https://issues.apache.org/jira/browse/FLINK-22529

On Thu, Apr 29, 2021 at 7:25 AM Ammon Diether  wrote:

>
> When using Flink Stateful Function's KinesisIngressBuilder, I do not see a
> way to set things like ConsumerConfigConstants.SHARD_GETRECORDS_MAX or
> ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS
>
> Looking at KinesisSourceProvider, it appears that this is the spot that
> creates the FlinkKinesisConsumer. The function named
> propertiesFromSpec(kinesisIngressSpec) only allows for AWS properties and a
> few startup position properties.
> ConsumerConfigConstants.SHARD_GETRECORDS_MAX cannot be provided.
>
> Is there an obvious workaround?
>
>


Re: Statefun 2.2.2 Checkpoint restore NPE

2021-05-28 Thread Tzu-Li (Gordon) Tai
Hi Timothy,

It would indeed be hard to figure this out without any stack traces.

Have you tried changing to debug level logs? Maybe you can also try using
the StateFun Harness to restore and run your job in the IDE - in that case
you should be able to see which code exactly is throwing this exception.

Cheers,
Gordon

On Fri, May 28, 2021 at 12:39 PM Timothy Bess  wrote:

> Hi,
>
> Just checking to see if anyone has experienced this error. Might just be a
> Flink thing that's irrelevant to statefun, but my job keeps failing over
> and over with this message:
>
> 2021-05-28 03:51:13,001 INFO org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (10/10) to
> produce into default topic
> __stateful_functions_random_topic_lNVlkW9SkYrtZ1oK
> 2021-05-28 03:51:13,001 INFO
> org.apache.flink.streaming.connectors.kafka.internal.
> FlinkKafkaInternalProducer [] - Attempting to resume transaction
> feedback-union -> functions -> Sink:
> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-45 with
> producerId 31 and epoch 3088
> 2021-05-28 03:51:13,017 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: lead-leads-ingress -> router (leads) (10/10)
> (ff51aacdb850c6196c61425b82718862) switched from RUNNING to FAILED.
> java.lang.NullPointerException: null
>
> The null pointer doesn't come with any stack traces or anything. It's
> really mystifying. Seems to just fail while restoring continuously.
>
> Thanks,
>
> Tim
>


Re: Configure Kafka ingress through property files in Stateful function 3.0.0

2021-05-28 Thread Tzu-Li (Gordon) Tai
Hi Jessy,

I assume "consumer.properties" is a file you have included in your StateFun
application's image?

The ingress.spec.properties field in the module YAML specification file
expects a list of key value pairs, not a properties file. See for example
[1].

I think it could make sense to supporting specifying property files directly
as while. Could you open a JIRA for this?

Thanks,
Gordon

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kafka-ingress.yaml#L36



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: State name uniqueness

2020-01-20 Thread Tzu-Li (Gordon) Tai
Hi Vasily,

State names need to be unique within operators only.

Cheers,
Gordon

On Mon, Jan 20, 2020 at 10:58 AM Vasily Melnik <
vasily.mel...@glowbyteconsulting.com> wrote:

> Hi all,
>
> I'm a bit confused with state name uniqueness.
> Should it be unique within operator only, or within entire job?
>
> С уважением,
> Василий Мельник
>


Re: Questions of "State Processing API in Scala"

2020-01-20 Thread Tzu-Li (Gordon) Tai
Hi Izual,

Thanks for reporting this! I'm also forwarding this to the user mailing
list, as that is the more suitable place for this question.

I think the usability of the State Processor API in Scala is indeed
something that hasn’t been looked at closely yet.

On Tue, Jan 21, 2020 at 8:12 AM izual  wrote:

> Hi community,
>
> When I use state in Scala, something makes confused, I followed these
> steps to generate and read states:
>
> a. implements the example[1] `CountWindowAverage` in Scala(exactly same),
> and run jobA => that makes good.
>
> b. execute `flink cancel -s ${JobID}` => savepoints was generated as
> expected.
>
> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code
> below), and run jobB => failed, exceptions shows that "Caused by:
> org.apache.flink.util.StateMigrationException: The new key serializer must
> be compatible."
>
>
> ReaderFunction code as below:
>
> ```
>
>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long,
> Long)] {
>
> var countState: ValueState[(Long, Long)] = _
>
> override def open(parameters: Configuration): Unit = {
>
>   val stateDescriptor = new ValueStateDescriptor("average",
> createTypeInformation[(Long, Long)])
>
>   countState = getRuntimeContext().getState(stateDescriptor)
>
> }
>
> override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context,
> out: Collector[(Long, Long)]): Unit = {
>
>   out.collect(countState.value())
>
> }
>
>   }
>
> ```
>
> d. then I try to use java.lang.Long instead of Long in key-type, and run
> jobB => exception just disappeared and that makes good.
>
> This makes me confused. Did I miss some features in State-Processing-API,
> such as `magic-implicits`?
>

This part is explainable. The "magic-implicits" actually happen in the
DataStream Scala API.
Any primitive Scala types will inferred and serialized as their Java
counterparts.
AFAIK, this would not happen in the State Processor API yet and therefore
why you are getting the StateMigrationException.
When using Scala types directly with the State Processor API, I would guess
that Kryo (as a generic fallback) was being used to access state.
This can probably be confirmed by looking at the exception stack trace. Can
you post a full copy of that?

This should be resolvable by properly supporting Scala for the State
Processor API, but it's just that up to this point, we didn't have a plan
for that yet.
Can you open a JIRA for this? I think it'll be a reasonable extension to
the API.


>
> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
> again,this time I tried to use Tuple(java.lang.Long) or something else, but
> does not work.
>

I'm not sure what you mean here. Where is this keyBy happening? In the
Scala DataStream job, or the State Processor API?


>
> Hope.
>
> 1:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>
> 2:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state


Cheers,
Gordon


Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-23 Thread Tzu-Li (Gordon) Tai
Hi Somya,

I'll have to take a closer look at the JIRA history to refresh my memory on
potential past changes that caused this.

My first suspection is this:
It is expected that the Kafka consumer will *ignore* the configured startup
position if the job was restored from a savepoint.
It will always use the offsets that were persisted at the time of the
savepoint.
Would this probably already explain what you are seeing?

What I'm not sure of yet is whether this was a behavioural change that
occurred between versions 1.2.x and 1.3.x or later versions.
I'll take a closer look once I'm back from travelling tomorrow and get back
to you on that.

Cheers,
Gordon

On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler  wrote:

> @gordon Do you remember whether we changed any behavior of the Kafka 0.10
> consumer after 1.3.3?
>
> On 23/01/2020 12:02, Somya Maithani wrote:
>
> Hey,
>
> Any ideas about this? We are blocked on the upgrade because we want async
> timer checkpointing.
>
> Regards,
>
> Somya Maithani
> Software Developer II
> Helpshift Pvt Ltd
>
>
> On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani 
> wrote:
>
>> Hey Team,
>>
>> *Problem*
>> Recently, we were trying to upgrade Flink infrastructure to version 1.9.1
>> and we noticed that a week old offset was consumed from Kafka even though
>> the configuration says latest.
>>
>> *Pretext*
>> 1. Our current Flink version in production is 1.2.1.
>> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
>> 3. We consume and produce messages to / from Kafka.
>>
>> *Release Plan*
>> 1. Upgrade Flink 1.2.1 to 1.3.
>> 2. Upgrade Flink 1.3.3 to 1.9.1
>> Note: We have a transitioning version (1.3.3) because of the
>> serialisation change in checkpointing.
>>
>> After performing step 1, the service was consuming latest Kafka events
>> but after performing step 2 we noticed that the service was consuming one
>> week old Kafka messages from the source topic. We did not see any
>> exceptions but since the number of messages consumed increased a lot for
>> our Flink infrastructure, our task managers started crashing eventually.
>>
>> We did not change Kafka configuration in the service for the upgrade but
>> we did upgrade the Flink dependencies for Kafka.
>>
>> Old dependency:
>>
>> 
>>>   org.apache.flink
>>>   flink-streaming-java_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-clients_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka-0.10_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-statebackend-rocksdb_2.10
>>>   ${flink.version}
>>> 
>>>
>>
>>
>> New dependency:
>>
>> 
>>>   org.apache.flink
>>>   flink-streaming-java_2.12
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-clients_2.12
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka-0.10_2.11
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-statebackend-rocksdb_2.12
>>>   ${flink.version}
>>> 
>>>
>>
>>
>> Do we know why this would be happening?
>>
>> Regards,
>>
>> Somya Maithani
>> Software Developer II
>> Helpshift Pvt Ltd
>>
>
>


Re: FsStateBackend vs RocksDBStateBackend

2020-01-29 Thread Tzu-Li (Gordon) Tai
Hi Ran,

On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang  wrote:

> Hi all,
>
> We have a Flink app that uses a KeyedProcessFunction, and in the function
> it requires a ValueState(of TreeSet) and the processElement method needs to
> access and update it. We tried to use RocksDB as our stateBackend but the
> performance is not good, and intuitively we think it was because of the
> serialization / deserialization on each processElement call.
>

As you have already pointed out, serialization behaviour is a major
difference between the 2 state backends, and will directly impact
performance due to the extra runtime overhead in RocksDB.
If you plan to continue using the RocksDB state backend, make sure to use
MapState instead of ValueState where possible, since every access to the
ValueState in the RocksDB backend requires serializing / deserializing the
whole value.
For MapState, de-/serialization happens per K-V access. Whether or not this
makes sense would of course depend on your state access pattern.


> Then we tried to switch to use FsStateBackend (which keeps the in-flight
> data in the TaskManager’s memory according to doc), and it could resolve
> the performance issue. *So we want to understand better what are the
> tradeoffs in choosing between these 2 stateBackend.* Our checkpoint size
> is 200 - 300 GB in stable state. For now we know one benefits of RocksDB is
> it supports incremental checkpoint, but would love to know what else we are
> losing in choosing FsStateBackend.
>

As of now, feature-wise both backends support asynchronous snapshotting,
state schema evolution, and access via the State Processor API.
In the end, the major factor for deciding between the two state backends
would be your expected state size.
That being said, it could be possible in the future that savepoint formats
for the backends are changed to be compatible, meaning that you will be
able to switch between different backends upon restore [1].


>
> Thanks a lot!
> Ran Zhang
>

Cheers,
Gordon

 [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State


Re: fliter and flatMap operation VS only a flatMap operation

2020-01-29 Thread Tzu-Li (Gordon) Tai
Hi,

If your filter and flatMap operators are chained, then the performance
difference should not be noticeable.
If a shuffle (i.e. a keyBy operation) occurs after the filter and before
the flatMap, then applying the filter first will be more efficient.

Cheers,
Gordon

On Thu, Jan 30, 2020 at 4:03 AM Soheil Pourbafrani 
wrote:

> Hi,
>
> In case we need to filter operation followed by a transformation, which
> one is more efficient in Flink, applying the filter operation first and
> then a flatMap operation separately OR using only a flatMap operation that
> internally includes the filter logic, too?
>
> best
> Soheil
>


Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-30 Thread Tzu-Li (Gordon) Tai
Update:
I can confirm my previous guess based on the changes in
https://issues.apache.org/jira/browse/FLINK-4280 that was merged for Flink
1.3.0.
When upgrading from Flink 1.2.x -> 1.3.0, the new startup position
configurations were respected over the checkpointed offsets (only once for
the first restore after upgrade).
After that, all restores from savepoints would only ever respect the
checkpointed offsets (regardless of whether or not it was the first restore
after upgrade).
This would explain the behaviour you encountered.

If you actually prefer to not have your Kafka consumer progress carried
over after the upgrade and want to just start consuming from the latest
offset,
one way to achieve that is to assign a new uid to the Kafka consumer
operator, and allow non-restored state when restoring.
With this change, Flink should consider the Kafka consumer operator to not
have any prior snapshotted state (i.e. offsets) and respect the startup
configuration.

Let me know if this works for you!

Cheers,
Gordon

On Thu, Jan 23, 2020 at 9:12 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Somya,
>
> I'll have to take a closer look at the JIRA history to refresh my memory
> on potential past changes that caused this.
>
> My first suspection is this:
> It is expected that the Kafka consumer will *ignore* the configured
> startup position if the job was restored from a savepoint.
> It will always use the offsets that were persisted at the time of the
> savepoint.
> Would this probably already explain what you are seeing?
>
> What I'm not sure of yet is whether this was a behavioural change that
> occurred between versions 1.2.x and 1.3.x or later versions.
> I'll take a closer look once I'm back from travelling tomorrow and get
> back to you on that.
>
> Cheers,
> Gordon
>
> On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler  wrote:
>
>> @gordon Do you remember whether we changed any behavior of the Kafka 0.10
>> consumer after 1.3.3?
>>
>> On 23/01/2020 12:02, Somya Maithani wrote:
>>
>> Hey,
>>
>> Any ideas about this? We are blocked on the upgrade because we want async
>> timer checkpointing.
>>
>> Regards,
>>
>> Somya Maithani
>> Software Developer II
>> Helpshift Pvt Ltd
>>
>>
>> On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani <
>> somyamaithan...@gmail.com> wrote:
>>
>>> Hey Team,
>>>
>>> *Problem*
>>> Recently, we were trying to upgrade Flink infrastructure to version
>>> 1.9.1 and we noticed that a week old offset was consumed from Kafka even
>>> though the configuration says latest.
>>>
>>> *Pretext*
>>> 1. Our current Flink version in production is 1.2.1.
>>> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
>>> 3. We consume and produce messages to / from Kafka.
>>>
>>> *Release Plan*
>>> 1. Upgrade Flink 1.2.1 to 1.3.
>>> 2. Upgrade Flink 1.3.3 to 1.9.1
>>> Note: We have a transitioning version (1.3.3) because of the
>>> serialisation change in checkpointing.
>>>
>>> After performing step 1, the service was consuming latest Kafka events
>>> but after performing step 2 we noticed that the service was consuming one
>>> week old Kafka messages from the source topic. We did not see any
>>> exceptions but since the number of messages consumed increased a lot for
>>> our Flink infrastructure, our task managers started crashing eventually.
>>>
>>> We did not change Kafka configuration in the service for the upgrade but
>>> we did upgrade the Flink dependencies for Kafka.
>>>
>>> Old dependency:
>>>
>>> 
>>>>   org.apache.flink
>>>>   flink-streaming-java_2.10
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-clients_2.10
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-connector-kafka-0.10_2.10
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-statebackend-rocksdb_2.10
>>>>   ${flink.version}
>>>> 
>>>>
>>>
>>>
>>> New dependency:
>>>
>>> 
>>>>   org.apache.flink
>>>>   flink-streaming-java_2.12
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-clients_2.12
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-connector-kafka-0.10_2.11
>>>>   ${flink.version}
>>>> 
>>>> 
>>>>   org.apache.flink
>>>>   flink-statebackend-rocksdb_2.12
>>>>   ${flink.version}
>>>> 
>>>>
>>>
>>>
>>> Do we know why this would be happening?
>>>
>>> Regards,
>>>
>>> Somya Maithani
>>> Software Developer II
>>> Helpshift Pvt Ltd
>>>
>>
>>


Re: Issue with committing Kafka offsets

2020-01-31 Thread Tzu-Li (Gordon) Tai
Hi,

There are no upper limits on the number of Kafka consumers per job.

For each one of your FlinkKafkaConsumers, are you using the same group.id?
That could maybe explain why you are experiencing higher commit times as
you are adding more FlinkKafkaConsumers, as AFAIK on the broker side, the
commit operations for the same consumer group are enqueued together.

As a side note, as the warning message already mentions, this does not
affect Flink's exactly-once guarantees.
If the only reason that you want to commit the offsets back to Kafka is to
have a way to monitor progress, it should be fine to define different
consumer group ids for each FlinkKafkaConsumer.

Hope this helps,
Gordon

On Sat, Feb 1, 2020 at 12:54 AM RKandoji  wrote:

> Can someone please help me here.
>
> Thanks
> RK
>
>
> On Thu, Jan 30, 2020 at 7:51 PM RKandoji  wrote:
>
>> Hi Team,
>>
>> I'm running into strange issue pasted below:
>>
>> Committing offsets to Kafka takes longer than the checkpoint interval.
>> Skipping commit of previous offsets because newer complete checkpoint
>> offsets are available. This does not compromise Flink's checkpoint
>> integrity.
>>
>>
>> I read data from more than 10 different Kafka topics, I started noticing
>> this issue as I integrate more number of Kafkaconsumer reading from
>> respective topics.
>>
>> Wondering if there is any upper limit on the number of Kafka consumers
>> (Kafka topics) per job?
>>
>> If not could someone please shed some light on why this could be
>> happening?
>>
>> Thanks,
>> RK
>>
>


Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
Hi,

Just to clarify -
I quickly went through the README of the project, and saw this:
"This error is seen after trying to read from a savepoint that was created
using the same case class as a key."

So, if I understood correctly, you were attempting to use the State
Processor API to access a savepoint that was written with a Scala
DataStream job, correct?

If that's the case, I'm afraid this would not work as of now. See [1] for a
similar scenario that others had also bumped into.
TL;DR is - the State Processor API currently is not guaranteed to work for
snapshots that are written with Scala DataStream jobs.

For now, I'll add a big warning about this to the docs.
But in general, it seems like we might want to consider bumping up the
priority for enabling this, as quite a few users are using the Scala
DataStream API for their jobs.

Just as a side comment: this repo looks like a very interesting project!

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-15719

On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe  wrote:

> Hey all,
>
> I've run into an issue with the State Processor API. To highlight the
> issues I've been having, I've created a reference repository that will
> demonstrate the issue (repository:
> https://github.com/segmentio/flink-state-management).
>
> The current implementation of the pipeline has left us with keyed state
> that we no longer need, and we don't have references some of the old keys.
> My plan was to:
> 1. create a savepoint
> 2. read the keys from each operator (using State Processor API)
> 3. filter out all the keys that are longer used
> 4. bootstrap a new savepoint that contains the filtered state
>
> I managed to get this working using a sample pipeline and a very basic key
> (a string), but when I switched the key to be something more complex (a
> case class of two strings), I started seeing this exception:
> Caused by: org.apache.flink.util.StateMigrationException: The new key
> serializer must be compatible.
> at
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
> ... 13 more
>
> Has anyone come across this before and figured out a fix? Any help you can
> give would be greatly appreciated!
>
> Thanks,
> --
> 
> Mark Niehe ·  Software Engineer
> Integrations
>   ·
> Blog   ·  
> We're
> Hiring! 
>


Re: State Processor API Keyed State

2020-02-18 Thread Tzu-Li (Gordon) Tai
There might be a possible workaround for this, for now:

Basically, the trick is to explicitly tell the State Processor API to use a
specified type information to access the keyed state.
You can do that with the `ExistingSavepoint#readKeyedState(String uid,
KeyedStateReaderFunction function, TypeInformation keyTypeInfo,
TypeInformation outTypeInfo)`.
This would allow the State Processor API to bypass the Java type
information extraction process (which is not compatible with how it is done
in Scala DataStream right now, hence the StateMigrationException you are
getting).

What you'd have to do, is in your pipeline job, explicitly generate the
serializer / type information using either the Scala DataStream macro
`createTypeInformation` or just use a custom serializer.
Then, specify to use that serializer / type info when reading keyed state
with the State Processor API.
Simply put: you'll be specifying explicitly what serializer to use for the
keys, and tell the State Processor API to also use that serializer to
access state.

This is not nice, but should work for now. Would be interesting to hear how
that works out for you.
As mentioned above, eventually a possible ideal solution is that type
information extraction should be converged for the Java / Scala DataStream
APIs.

Cheers,
Gordon

On Wed, Feb 19, 2020 at 10:20 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> Just to clarify -
> I quickly went through the README of the project, and saw this:
> "This error is seen after trying to read from a savepoint that was created
> using the same case class as a key."
>
> So, if I understood correctly, you were attempting to use the State
> Processor API to access a savepoint that was written with a Scala
> DataStream job, correct?
>
> If that's the case, I'm afraid this would not work as of now. See [1] for
> a similar scenario that others had also bumped into.
> TL;DR is - the State Processor API currently is not guaranteed to work for
> snapshots that are written with Scala DataStream jobs.
>
> For now, I'll add a big warning about this to the docs.
> But in general, it seems like we might want to consider bumping up the
> priority for enabling this, as quite a few users are using the Scala
> DataStream API for their jobs.
>
> Just as a side comment: this repo looks like a very interesting project!
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-15719
>
> On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe  wrote:
>
>> Hey all,
>>
>> I've run into an issue with the State Processor API. To highlight the
>> issues I've been having, I've created a reference repository that will
>> demonstrate the issue (repository:
>> https://github.com/segmentio/flink-state-management).
>>
>> The current implementation of the pipeline has left us with keyed state
>> that we no longer need, and we don't have references some of the old keys.
>> My plan was to:
>> 1. create a savepoint
>> 2. read the keys from each operator (using State Processor API)
>> 3. filter out all the keys that are longer used
>> 4. bootstrap a new savepoint that contains the filtered state
>>
>> I managed to get this working using a sample pipeline and a very basic
>> key (a string), but when I switched the key to be something more complex (a
>> case class of two strings), I started seeing this exception:
>> Caused by: org.apache.flink.util.StateMigrationException: The new key
>> serializer must be compatible.
>> at
>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
>> ... 13 more
>>
>> Has anyone come across this before and figured out a fix? Any help you
>> can give would be greatly appreciated!
>>
>> Thanks,
>> --
>> <http://segment.com/>
>> Mark Niehe ·  Software Engineer
>> Integrations
>> <https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·
>> Blog <https://segment.com/blog?utm_source=signature&utm_medium=email>
>>   ·  We're Hiring!
>> <https://segment.com/jobs?utm_source=signature&utm_medium=email>
>>
>


Re: Writing a POJO Schema Evolution E2E test in Java

2020-02-20 Thread Tzu-Li (Gordon) Tai
Hi Theo,

This is indeed a tricky feature to test!

On Thu, Feb 20, 2020 at 8:59 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi,
>
> We have a pipeline which internally uses Java POJOs and also needs to keep
> some events entirely in state for some time.
>
> From time to time, our POJOs evolve, like attributes are added or removed.
>
> Now I wanted to write a E2E test that proves the schema migration works
> (Having different schemas in source kafka topic, flink pipeline state and
> sink) for bounded scenarios (attribute added or removed)
>
> I figured out that in my test, I can instantiate a
> MiniClusterWithClientResource, receive a client, start a job over the
> client and also cancel the job with a savepoint. My idea was to start the
> job, put some records in, cancel with a savepoint and restart the job from
> savepoint, but with a slightly different POJO (added another attribute and
> removed an existing one).
>
> Currently, I'm sadly missing two pieces:
> 1. I don't see a way to restart a job from savepoint via the client
> obtained from the MiniClusterWithClientResource in my test
> 2. According to a flink blog post [1],schema evolution of POJOs is more
> limited, especially the evolved POJO must have the same "nampesacpe" (i.e.
> java package?!) and class name.
>

The way this is sort of overcome by tests in Flink also surrounding schema
/ serializer evolution is to have two different classes (with different
classnames) and reload it in new classloaders so that they can be
"relocated" to have the same names at runtime.
In Flink, we use a `ClassRelocator` utility to do this. You can check out
example usages of it in the `PojoSerializerUpgradeTest` and
`TypeSerializerUpgradeTestBase`.

I'm not entirely sure if it would work in your scenario, but I think it's
worth giving it a try since it'll make writing such tests easier.

If this doesn't work, then you could try doing it such that you have
separate modules (i.e. jars) for the old / new Pojo definition, and then a
separate module that does the actual test logic while loading the jars
containing the old / new Pojos with different classloaders.
That would resemble what happens in reality more closely.


> Especially point 2 seems to make it impossible for me to automate testing
> of the evolution, but need to do it manually.
>
> Do you have any idea how I could overcome these limitations so that I can
> build a proper end to end test for the schema migration to work?
>
> Best regards
> Theo
>
> [1]
> https://flink.apache.org/news/2020/01/29/state-unlocked-interacting-with-state-in-apache-flink.html
>

Hope that helps! Would be great to hear back from you on how it works out.

Cheers,
Gordon


Re: Correct way to e2e test a Flink application?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi Laurent,

You can take a look at Flink's MiniClusterResource JUnit test rule, and its
usages in the codebase for that.
The rule launches a Flink MiniCluster within the same JVM, and submission to
the mini cluster resembles how it would be submitting to an actual Flink
cluster, so you would already be able to catch problems such as operator
serialization errors.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How is state stored in rocksdb?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

First of all, state is only managed by Flink (and therefore Flink's state
backends) if the state is registered by the user.
You can take a look at the documents here [1] on details on how to register
state.
A state has to be registered for it to be persisted in checkpoints /
savepoints, and be fault-tolerant across Flink job restarts.

As for your second part of your question on serialization:
Once you take a look at how to register state to be managed by Flink, you'd
quickly realize that you can specify the serializer for registered state. If
you simply provide the class of the state data type, then Flink will use its
own type extraction to figure out the serializer to use for the type. Please
see [2] for details on that. Otherwise, a custom serializer implementation
can be provided.
In general, you can find quite a bit about state serialization here [3].

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#using-managed-keyed-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink on AWS - ActiveMQ connector

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

The connectors that are listed in the AWS documentation page that you
referenced are not provided by AWS. They are bundled connectors shipped by
the Apache Flink community as part of official Flink releases, and are
discoverable as artifacts from the Maven central repository. See the
respective Flink connector documentation pages (for example [1] for Flink's
Apache Kafka connector) on how to use those connectors in your jobs.

As for the ActiveMQ connector provided by Apache Bahir, there's also a Maven
artifact for that shipped by Apache Bahir [2].

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html
[2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi Kaymak,

To answer your last question:
there will be no data loss in that scenario you described, but there could
be duplicate processed records.

With checkpointing enabled, the Flink Kafka consumer does not commit
offsets back to Kafka until offsets in Flink checkpoints have been
persisted.

That external offset commit, however, is not guaranteed to happen, and
always "lag" behind the offsets maintained internally in Flink checkpoints.
That is the reason for why there may be duplicate consumed records if you
rely on those on startup, instead of the offsets maintained within Flink.

The rule of thumb is:
Committed offsets back to Kafka by the Flink Kafka consumer is only a means
to expose progress to the outside world,
and there is no guarantee that those committed offsets are consistent with
operator states in the streaming job.

BR,
Gordon


On Mon, Mar 2, 2020, 11:18 PM Kaymak, Tobias 
wrote:

> Thank you! One last question regarding Gordons response. When a pipeline
> stops consuming and cleanly shuts down and there is no error during that
> process, and then it gets started again and uses the last committed offset
> in Kafka - there should be no data loss - or am I missing something?
>
> In what scenario should I expect a data loss? (I can only think of the
> jobmanager or taskmanager getting killed before the shutdown is done.)
>
> Best,
> Tobi
>
> On Mon, Mar 2, 2020 at 1:45 PM Piotr Nowojski  wrote:
>
>> Hi,
>>
>> Sorry for my previous slightly confusing response, please take a look at
>> the response from Gordon.
>>
>> Piotrek
>>
>> On 2 Mar 2020, at 12:05, Kaymak, Tobias  wrote:
>>
>> Hi,
>>
>> let me refine my question: My pipeline is generated from Beam, so the
>> Flink pipeline is a translated Beam pipeline. When I update my Apache Beam
>> pipeline code, working with a snapshot in Flink to stop the pipeline is not
>> an option, as the snapshot will use the old representation of the the Flink
>> pipeline when resuming from that snapshot.
>>
>> Meaning that I am looking for a way to drain the pipeline cleanly and
>> using the last committed offset in Kafka to resume processing after I
>> started it again (launching it through Beam will regenerate the Flink
>> pipeline and it should resume at the offset where it left of, that is the
>> latest committed offset in Kafka).
>>
>> Can this be achieved with a cancel or stop of the Flink pipeline?
>>
>> Best,
>> Tobias
>>
>> On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi Tobi,
>>>
>>> No, FlinkKafkaConsumer is not using committed Kafka’s offsets for
>>> recovery. Offsets where to start from are stored in the checkpoint itself.
>>> Updating the offsets back to Kafka is an optional, purely cosmetic thing
>>> from the Flink’s perspective, so the job will start from the correct
>>> offsets.
>>>
>>> However, if you for whatever the reason re-start the job from a
>>> savepoint/checkpoint that’s not the latest one, this will violate
>>> exactly-once guarantees - there will be some duplicated records committed
>>> two times in the sinks, as simply some records would be processed and
>>> committed twice. Committing happens on checkpoint, so if you are recovering
>>> to some previous checkpoint, there is nothing Flink can do - some records
>>> were already committed before.
>>>
>>> Piotrek
>>>
>>> On 2 Mar 2020, at 10:12, Kaymak, Tobias 
>>> wrote:
>>>
>>> Thank you Piotr!
>>>
>>> One last question - let's assume my source is a Kafka topic - if I stop
>>> via the CLI with a savepoint in Flink 1.9, but do not use that savepoint
>>> when restarting my job - the job would continue from the last offset that
>>> has been committed in Kafka and thus I would also not experience a loss of
>>> data in my sink. Is that correct?
>>>
>>> Best,
>>> Tobi
>>>
>>> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski 
>>> wrote:
>>>
 Yes, that’s correct. There shouldn’t be any data loss. Stop with
 savepoint is a solution to make sure, that if you are stopping a job
 (either permanently or temporarily) that all of the results are
 published/committed to external systems before you actually stop the job.

 If you just cancel/kill/crash a job, in some rare cases (if a
 checkpoint was completing at the time cluster was crashing), some records
 might not be committed before the cancellation/kill/crash happened. Also
 note that doesn’t mean there is a data loss, just those records will be
 published once you restore your job from a checkpoint. If you want to stop
 the job permanently, that might not happen, hence we need stop with
 savepoint.

 Piotrek

 On 28 Feb 2020, at 15:02, Kaymak, Tobias 
 wrote:

 Thank you! For understanding the matter: When I have a streaming
 pipeline (reading from Kafka, writing somewhere) and I click "cancel" and
 after that I restart the pipeline - I should not expect any data to be lost
 - is that correct?

 Best,
 Tobias

 On F

Re: java.time.LocalDateTime in POJO type

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

What that LOG means (i.e. "must be processed as a Generic Type") is that
Flink will have to fallback to using Kryo for the serialization for that
type.

You should be concerned about that if:
1) That type is being used for some persisted state in snapshots. That would
be the case if you've registered state of that type, or is used as the input
for some built-in operator that persists input records in state (e.g. window
operators). Kryo generally does not have a friendly schema evolution story,
so you would want to avoid that going into production.
2) Kryo itself is not the fastest compared to Flink's POJO serializer, so
that would be something to consider as well even if the type is only used
for transient, on-wire data.

I think in your case, since your POJO contains an inner field that cannot be
recognized as a POJO (i.e. the LocalDateTime), then your outer class is also
not recognized as a POJO.

BR,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: what is the hash function that Flink creates the UID?

2020-03-02 Thread Tzu-Li (Gordon) Tai
Hi,

Flink currently performs a 128-bit murmur hash on the user-provided uids to
generate the final node hashes in the stream graph. Specifically, this
library is being used [1] as the hash function.

If what you are looking for is for Flink to use exactly the provided hash,
you can use `setUidHash` for that - Flink will use that provided uid hash as
is for the generated node hashes.
However, that was exposed as a means for manual workarounds to allow for
backwards compatibility in legacy breaking cases, so it is not advised to
use that in your case.

BR,
Gordon

[1]
https://guava.dev/releases/19.0/api/docs/com/google/common/hash/Hashing.html#murmur3_128(int)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Very large _metadata file

2020-03-04 Thread Tzu-Li (Gordon) Tai
Hi Jacob,

Apart from what Klou already mentioned, one slightly possible reason:

If you are using the FsStateBackend, it is also possible that your state is
small enough to be considered to be stored inline within the metadata file.
That is governed by the "state.backend.fs.memory-threshold" configuration,
with a default value of 1024 bytes, or can also be configured with the
`fileStateSizeThreshold` argument when constructing the `FsStateBackend`.
The purpose of that threshold is to ensure that the backend does not create
a large amount of very small files, where potentially the file pointers are
actually larger than the state itself.

Cheers,
Gordon



On Wed, Mar 4, 2020 at 6:17 PM Kostas Kloudas  wrote:

> Hi Jacob,
>
> Could you specify which StateBackend you are using?
>
> The reason I am asking is that, from the documentation in [1]:
>
> "Note that if you use the MemoryStateBackend, metadata and savepoint
> state will be stored in the _metadata file. Since it is
> self-contained, you may move the file and restore from any location."
>
> I am also cc'ing Gordon who may know a bit more about state formats.
>
> I hope this helps,
> Kostas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
>
> On Wed, Mar 4, 2020 at 1:25 AM Jacob Sevart  wrote:
> >
> > Per the documentation:
> >
> > "The meta data file of a Savepoint contains (primarily) pointers to all
> files on stable storage that are part of the Savepoint, in form of absolute
> paths."
> >
> > I somehow have a _metadata file that's 1.9GB. Running strings on it I
> find 962 strings, most of which look like HDFS paths, which leaves a lot of
> that file-size unexplained. What else is in there, and how exactly could
> this be happening?
> >
> > We're running 1.6.
> >
> > Jacob
>


Re: RichAsyncFunction + BroadcastProcessFunction

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi John,

Have you considered letting the BroadcastProcessFunction output events that
indicate extra external HTTP requests needs to be performed, and have them
consumed by a downstream async IO operator to complete the HTTP request?
That could work depending on what exactly you need to do in your specific
case.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: what is the difference between map vs process on a datastream?

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

As David already explained, they are similar in that you may output zero to
multiple records for both process and flatMap functions.

However, ProcessFunctions also expose to the user much more powerful
functionality, such as registering timers, outputting to side outputs, etc.

Cheers,
Gordon




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Apache Airflow - Question about checkpointing and re-run a job

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

I believe that the title of this email thread was a typo, and should be
"Apache Flink - Question about checkpointing and re-run a job."
I assume this because the contents of the previous conversations seem to be
purely about Flink.

Otherwise, as far as I know, there doesn't seem to be any publicly available
Airflow operators for Flink right now.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Help me understand this Exception

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi,

The exception stack you posted simply means that the next operator in the
chain failed to process the output watermark.
There should be another exception, which would explain why some operator
was closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon

On Tue, Mar 17, 2020 at 11:27 PM aj  wrote:

> Hi,
> I am running a streaming job with generating watermark like this :
>
> public static class SessionAssigner implements 
> AssignerWithPunctuatedWatermarks {
> @Override
> public long extractTimestamp(GenericRecord record, long 
> previousElementTimestamp) {
> long timestamp = (long) record.get("event_ts");
> LOGGER.info("timestamp", timestamp);
> return timestamp;
> }
>
> @Override
> public Watermark checkAndGetNextWatermark(GenericRecord record, long 
> extractedTimestamp) {
> // simply emit a watermark with every event
> LOGGER.info("extractedTimestamp ", extractedTimestamp);
> return new Watermark(extractedTimestamp);
> }
> }
>
> Please help me understand what this exception means:
>
> java.lang.RuntimeException: Exception occurred while processing valve
> output watermark:
> at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:216)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:189)
> at org.apache.flink.streaming.runtime.streamstatus.
> StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processElement(StreamOneInputProcessor.java:169)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:143)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:279)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:301)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:406)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
> at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
> at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
> at org.apache.flink.streaming.api.operators.TimestampedCollector.
> collect(TimestampedCollector.java:51)
> at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 137)
> at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:
> 116)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:50)
> at org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalIterableProcessWindowFunction.process(
> InternalIterableProcessWindowFunction.java:32)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.emitWindowContents(WindowOperator.java:549)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.java:457)
> at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
> .advanceWatermark(InternalTimerServiceImpl.java:276)
> at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
> .advanceWatermark(InternalTimeServiceManager.java:128)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(AbstractStreamOperator.java:784)
> at org.apache.flink.streaming.runtime.io.
> StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
> StreamOneInputProcessor.java:213)
> ... 10 more
>
> --
> Thanks & Regards,
> Anuj Jain
>
>
>
> 
>


Re: state schema evolution for case classes

2020-03-17 Thread Tzu-Li (Gordon) Tai
Hi Apoorv,

Flink currently does not natively support schema evolution for state types
using Scala case classes [1].

So, as Roman has pointed out, there are 2 possible ways for you to do that:
- Implementing a custom serializer that support schema evolution for your
specific Scala case classes, as Roman suggested.
- or, using the State Processor API [2] to migrate your case classes
offline as a batch job

For your question on how to implement a schema-evolution supporting
serializer, can you share with me the problems you have met so far?
Otherwise, if you take a look at the PojoSerializerSnapshot class, that
would be a starting point to implement something similar for your case
classes.

As you will quickly realize, it's not simple, so I would strongly suggest
trying out the approach of using the State Processor API.
Either way, if you bump into any problems, feel free to let me know.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-10896
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Wed, Mar 18, 2020 at 1:04 PM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Thanks a lot , Also can you share one example where these has been
> implemented? I have gone through docs does not happen to work still
>
> On Wed, Feb 26, 2020 at 7:59 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Apoorv,
>>
>> You can achieve this by implementing custom serializers for your state.
>> Please refer to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/custom_serialization.html
>>
>> Regards,
>> Roman
>>
>>
>> On Wed, Feb 26, 2020 at 6:53 AM Apoorv Upadhyay <
>> apoorv.upadh...@razorpay.com> wrote:
>>
>>> Hi Roman,
>>>
>>> I have successfully migrated to flink 1.8.2 with the savepoint created
>>> by flink 1.6.2.
>>> Now I have to modify few case classes due to new requirement I have
>>> created a savepoint and when I run the app with modified class from the
>>> savepoint it throws error "state not compatible"
>>> Previously there were no serializer used.
>>> I now wish to support state schema Hence need suggestion how can i
>>> achieve that ?
>>>
>>> Regards
>>>
>>> On Tue, Feb 25, 2020 at 9:08 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi ApoorvK,

 I understand that you have a savepoint created by Flink 1.6.2 and you
 want to use it with Flink 1.8.2. The classes themselves weren't modified.
 Is that correct?
 Which serializer did you use?

 Regards,
 Roman


 On Tue, Feb 25, 2020 at 8:38 AM ApoorvK 
 wrote:

> Hi Team,
>
> Earlier we have developed on flink 1.6.2 , So there are lots of case
> classes
> which have Map,Nested case class within them for example below :
>
> case class MyCaseClass(var a: Boolean,
>  var b: Boolean,
>  var c: Boolean,
>  var d: NestedCaseClass,
>  var e:Int){
> def this(){this(false,false,new NestedCaseClass,0)}
> }
>
>
> Now we have migrated to flink 1.8.2 , I need help to figure out how
> can I
> achieve state schema evolution for such classes.
>
> 1. Is creating avro for these classes now, and implement avro
> serialisation
> will that work ?
> 2. Or if I register kyroserialiser with protobuf serialiser at env?
>
> Please suggest what can be done here, or redirect for the avros
> serialisation example.
>
> Thanks
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>



Re: state schema evolution for case classes

2020-03-27 Thread Tzu-Li (Gordon) Tai
Hi Apoorv,

Sorry for the late reply, have been quite busy with backlog items the past
days.

On Fri, Mar 20, 2020 at 4:37 PM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Thanks Gordon for the suggestion,
>
> I am going by this repo :
> https://github.com/mrooding/flink-avro-state-serialization
>
> So far I am able to alter the scala case classes and able to restore from
> savepoint using memory state backend, but when I am using rocksdb as
> statebackend and try to restore from savepoint it break with following
> error :
>

When you say restoring it with the RocksDB backend, was the savepoint you
are attempting to restore from taken with the RocksDB backend as well?
I'm asking that, because currently you cannot change the state backend
across restores, as they have different savepoint binary formats.
This is also the case when you use the State Processor API - when you load
an existing savepoint, you first have to load it with the same state
backend that was used to create the savepoint. You can change the state
backend using the State Processor API, by creating a new savepoint with
your desired target backend, and dumping all state data extracted from the
loaded savepoint into the new fresh savepoint.
There has been previous proposals (FLIP-41) [1] to unify the savepoint
formats which would make a lot of this easier, but AFAIK this isn't on the
roadmap in the near future.

Best Regards,
Gordon

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State


>
> org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from 
> RocksDB.
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:92)
>   at 
> nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:14)
>   at 
> nl.mrooding.ProductProcessor.processElement1(ProductProcessor.scala:8)
>   at 
> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
>   at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
>   at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
>   at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
>   at 
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>   at 
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>   at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>   at 
> nl.mrooding.state.CustomAvroSerializer$class.deserialize(CustomAvroSerializer.scala:42)
>   at 
> nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
>   at 
> nl.mrooding.state.ProductSerializer.deserialize(ProductSerializer.scala:9)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
>   ... 8 more
>
>
>
>
> On Wed, Mar 18, 2020 at 10:56 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Apoorv,
>>
>> Flink currently does not natively support schema evolution for state
>> types using Scala case classes [1].
>>
>> So, as Roman has pointed out, there are 2 possible ways for you to do
>> that:
>> - Implementing a custom serializer that support schema evolution for your
>> specific Scala case classes, as Roman suggested.
>> - or, using the State Processor API [2] to migrate your case classes
>> offline as a batch job
>>
>> For your question on how to implement a schema-evolution supporting
>> serializer, can you sh

Fwd: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Tzu-Li (Gordon) Tai
It seems like Seth's reply didn't make it to the mailing lists somehow.
Forwarding his reply below:

-- Forwarded message -
From: Seth Wiesman 
Date: Thu, Mar 26, 2020 at 5:16 AM
Subject: Re: Lack of KeyedBroadcastStateBootstrapFunction
To: Dawid Wysakowicz 
Cc: , Tzu-Li (Gordon) Tai 


As Dawid mentioned, you can implement your own operator using the transform
method to do this yourself. Unfortunately, that is fairly low level and
would require you to understand some flink amount internals.

The real problem is that the state processor api does not support two input
operators. We originally skipped that because there were a number of open
questions about how best to do it and it wasn't clear that it would be a
necessary feature. Typically, flink users use two input operators to do
some sort of join. And when bootstrapping state, you typically only want to
pre-fill one side of that join. KeyedBroadcastState is clearly a good
counter-argument to that.

I've opened a ticket for the feature if you would like to comment there.

https://issues.apache.org/jira/browse/FLINK-16784

On Tue, Mar 24, 2020 at 9:17 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> I am not very familiar with the State Processor API, but from a brief look
> at it, I think you are right. I think the State Processor API does not
> support mixing different kinds of states in a single operator for now. At
> least not in a nice way. Probably you could implement the
> KeyedBroadcastStateBootstrapFunction yourself and us it with
> KeyedOperatorTransformation#transform(org.apache.flink.state.api.SavepointWriterOperatorFactory).
> I understand this is probably not the easiest task.
>
> I am not aware if there are plans to support that out of the box, but I
> cc'ed Gordon and Seth who if I remember correctly worked on that API. I
> hope they might give you some more insights.
>
> Best,
>
> Dawid
>  On 23/03/2020 17:36, Mark Niehe wrote:
>
> Hey all,
>
> I have another question about the State Processor API. I can't seem to
> find a way to create a KeyedBroadcastStateBootstrapFunction operator. The
> two options currently available to bootstrap a savepoint with state are
> KeyedStateBootstrapFunction and BroadcastStateBootstrapFunction. Because
> these are the only two options, it's not possible to bootstrap both keyed
> and broadcast state for the same operator. Are there any plans to add that
> functionality or did I miss it entirely when going through the API docs?
>
> Thanks,
> --
> <http://segment.com/>
> Mark Niehe ·  Software Engineer
> Integrations
> <https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·
> Blog <https://segment.com/blog?utm_source=signature&utm_medium=email>  ·  
> We're
> Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email>
>
>


Re: Lack of KeyedBroadcastStateBootstrapFunction

2020-03-30 Thread Tzu-Li (Gordon) Tai
Thanks! Looking forward to that.

On Tue, Mar 31, 2020 at 1:02 AM Mark Niehe  wrote:

> Hi Gordan and Seth,
>
> Thanks for explanation and opening up the ticket. I'll add some details in
> the ticket to explain what we're trying to do which will hopefully add some
> context.
>
> --
> <http://segment.com/>
> Mark Niehe ·  Software Engineer
> Integrations
> <https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·
> Blog <https://segment.com/blog?utm_source=signature&utm_medium=email>  ·  
> We're
> Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email>
>
> On Mon, Mar 30, 2020 at 1:04 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> It seems like Seth's reply didn't make it to the mailing lists somehow.
>> Forwarding his reply below:
>>
>> -- Forwarded message -
>> From: Seth Wiesman 
>> Date: Thu, Mar 26, 2020 at 5:16 AM
>> Subject: Re: Lack of KeyedBroadcastStateBootstrapFunction
>> To: Dawid Wysakowicz 
>> Cc: , Tzu-Li (Gordon) Tai 
>>
>>
>> As Dawid mentioned, you can implement your own operator using the
>> transform method to do this yourself. Unfortunately, that is fairly low
>> level and would require you to understand some flink amount internals.
>>
>> The real problem is that the state processor api does not support two
>> input operators. We originally skipped that because there were a number of
>> open questions about how best to do it and it wasn't clear that it would be
>> a necessary feature. Typically, flink users use two input operators to do
>> some sort of join. And when bootstrapping state, you typically only want to
>> pre-fill one side of that join. KeyedBroadcastState is clearly a good
>> counter-argument to that.
>>
>> I've opened a ticket for the feature if you would like to comment there.
>>
>> https://issues.apache.org/jira/browse/FLINK-16784
>>
>> On Tue, Mar 24, 2020 at 9:17 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi,
>>>
>>> I am not very familiar with the State Processor API, but from a brief
>>> look at it, I think you are right. I think the State Processor API does not
>>> support mixing different kinds of states in a single operator for now. At
>>> least not in a nice way. Probably you could implement the
>>> KeyedBroadcastStateBootstrapFunction yourself and us it with
>>> KeyedOperatorTransformation#transform(org.apache.flink.state.api.SavepointWriterOperatorFactory).
>>> I understand this is probably not the easiest task.
>>>
>>> I am not aware if there are plans to support that out of the box, but I
>>> cc'ed Gordon and Seth who if I remember correctly worked on that API. I
>>> hope they might give you some more insights.
>>>
>>> Best,
>>>
>>> Dawid
>>>  On 23/03/2020 17:36, Mark Niehe wrote:
>>>
>>> Hey all,
>>>
>>> I have another question about the State Processor API. I can't seem to
>>> find a way to create a KeyedBroadcastStateBootstrapFunction operator. The
>>> two options currently available to bootstrap a savepoint with state are
>>> KeyedStateBootstrapFunction and BroadcastStateBootstrapFunction. Because
>>> these are the only two options, it's not possible to bootstrap both keyed
>>> and broadcast state for the same operator. Are there any plans to add that
>>> functionality or did I miss it entirely when going through the API docs?
>>>
>>> Thanks,
>>> --
>>> <http://segment.com/>
>>> Mark Niehe ·  Software Engineer
>>> Integrations
>>> <https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·
>>> Blog <https://segment.com/blog?utm_source=signature&utm_medium=email>
>>>   ·  We're Hiring!
>>> <https://segment.com/jobs?utm_source=signature&utm_medium=email>
>>>
>>>


Re: state schema evolution for case classes

2020-04-02 Thread Tzu-Li (Gordon) Tai
Hi,

I see the problem that you are bumping into as the following:
- In your previous job, you seem to be falling back to Kryo for the state
serialization.
- In your new job, you are trying to change that to use a custom serializer.

You can confirm this by looking at the stack trace of the "new state
serializer is not compatible" exception.
Could you maybe post me the stack trace?

If that is indeed the case, I'm afraid that right now, you'll only be able
to resolve this using the State Processor API to migrate this serializer
offline.
The reason for this is as follows:
- In order to perform the serializer migration at restore time, Flink needs
to understand that the old serializer (i.e. the Kryo serializer in your
case) is compatible with the new serializer (the custom TestDataNested
serializer in your case).
- Currently, Flink cannot reason about serializer compatibility when the
serializer changes completely to a different class than before. Therefore,
if the serializer class changes, to be safe, right now Flink always assume
that the new serializer is not compatible and therefore fails the restore.

You can manually force this migration offline, as I said using the State
Processor API:
- The steps would be to load the previous savepoint, and when reading your
`testdata-join` state values, use the previous way of providing the
serializer (i.e. classOf[TestDataNested]).
- Then, bootstrap a new savepoint with the state values read from
`testdata-join`. You may use whatever new serializer you want to write the
state into the new savepoint.

As a side note, I have been thinking about two options that allows a easier
path for users to do this:
Option #1: The Kryo serializer should assume that new serializers are
always compatible, given that the target serialized classes are the same
(which is true for your case). This allows users to opt-out of Kryo
serialization, which has always just been a fallback that many users did
not realize they were using when Flink cannot interpret the state type.
Option #2: Maybe add a "force-migration" option when restoring from
savepoints. This would essentially be an online version of the State
Processor API process I explained above, but instead of happening offline,
the migration would happen at restore from savepoints.

TL;DR: for now, I would suggest to try using the State Processor API to
migrate the serializer for your specific case.

Cheers,
Gordon

On Thu, Apr 2, 2020 at 11:14 PM Apoorv Upadhyay <
apoorv.upadh...@razorpay.com> wrote:

> Hi Gordon,
>
> thanks for your response , So I have done a POC on state migration using
> avro, it seems it works out well.
>
> I am using custom avro serializer (with avro schema and (TypeSerializer,
> TypeSerializerSnapshot) and based on that written my own custom
> serializer for the scala case class that I am serialising (I am using
> rocksdb as statedbackend).
>
> So when I am evolve the class with any datatype I just change avsc(avro
> schema json) and give old schema as well as new schema to serialise data
> already in rocksDB to read and accordingly write it with new and it works
> just fine. So I can add new class to my application supporting schema
> evolution,
>
> I have define state like this :
>
>
> private[this] lazy val stateDescriptorTest: 
> ValueStateDescriptor[TestDataNested] =
>   new ValueStateDescriptor[TestDataNested]("testdata-join", 
> TestDataNested.serializer)
> private[this] lazy val stateTest: ValueState[TestDataNested] = 
> getRuntimeContext.getState(stateDescriptorTest)
>
>
> Now the problem with the existing class in my current application we have 
> define state as follow (for example):
>
>
> private[this] lazy val stateDescriptorTest: 
> ValueStateDescriptor[TestDataNested] =
>   new ValueStateDescriptor[TestDataNested]("testdata-join", 
> classOf[TestDataNested])
> private[this] lazy val stateTest: ValueState[TestDataNested] = 
> getRuntimeContext.getState(stateDescriptorTest)
>
> So when I provide TestDataNested.serializer  Instead of  
> "classOf[TestDataNested]"
> in my current application, basically replace the serialise it throws the
> "new state serialiser is not compaitable.
>
> What can I do here, would be great help thanks in advance
>
> On Fri, Mar 27, 2020 at 1:19 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Apoorv,
>>
>> Sorry for the late reply, have been quite busy with backlog items the
>> past days.
>>
>> On Fri, Mar 20, 2020 at 4:37 PM Apoorv Upadhyay <
>> apoorv.upadh...@razorpay.com> wrote:
>>
>>> Thanks Gordon for the suggestion,
>>>
>>> I am going by this repo :
>>> https://github.com/mrooding/flink-avro-state-serialization
>>>
>>> So far I am able to alter the scal

[ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-07 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions 2.0.0.

Stateful Functions is an API that simplifies building distributed stateful
applications.
It's based on functions with persistent state that can interact dynamically
with strong consistency guarantees.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Docker image for building Stateful Functions applications is
currently being published to Docker Hub.
Dockerfiles for this release can be found at:
https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
Progress for creating the Docker Hub repository can be tracked at:
https://github.com/docker-library/official-images/pull/7749

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346878

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: StateFun - Multiple modules example

2020-04-08 Thread Tzu-Li (Gordon) Tai
Hi Oytun!

You can see here an example of how to package a StateFun application image
that contains multiple modules:
https://ci.apache.org/projects/flink/flink-statefun-docs-stable/deployment-and-operations/packaging.html#images

Essentially, for each module you want to include in your application, you
add the jar containing the service file (for embedded JVM functions)
or the YAML module definition file (for remote functions) under the
"/opt/statefun/modules" directory of the packaged image.

Cheers,
Gordon


On Thu, Apr 9, 2020 at 7:43 AM Oytun Tez  wrote:

> Hi there,
>
> Does anyone have any statefun 2.0 examples with multiple modules?
>
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>


Re: [Stateful Functions] Using Flink CEP

2020-04-13 Thread Tzu-Li (Gordon) Tai
Hi!

It isn't possible to use Flink CEP within Stateful Functions.

That could be an interesting primitive, to add CEP-based function
constructs.
Could your briefly describe what you are trying to achieve?

On the other hand, there are plans to integrate Stateful Functions more
closely with the Flink APIs.
One direction we've been thinking about is to, for example, support Flink
DataStreams as StateFun ingress / egresses. In this case, you'll be able to
use Flink CEP to detect patterns, and use the results as an ingress which
invokes functions within a StateFun app.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-13 Thread Tzu-Li (Gordon) Tai
Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about EventTimeTrigger

2020-04-13 Thread Tzu-Li (Gordon) Tai
Hi,

Could you briefly describe what you are trying to achieve?

By definition, a GlobalWindow includes all data - the ending timestamp for
these windows are therefore Long.MAX_VALUE. An event time trigger wouldn't
make sense here, since that trigger would never fire (watermark can not pass
the end timestamp of a GlobalWindow).

Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Benchmark for Stateful Functions

2020-05-03 Thread Tzu-Li (Gordon) Tai
Hi Omid,

There currently aren't any benchmarks that I know of for Stateful Functions.

However, Stateful Functions applications run on top of Apache Flink and
therefore share the same network stack / runtime. So, if throughput and
latency is your only concern, you should be able carry over any results from
Flink.

What in particular are you interested in?
As for any benchmark, it'd only be useful with a specific use case /
scenario in mind.
It would be interesting to hear what you have in mind.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-08 Thread Tzu-Li (Gordon) Tai
Hi,

The last time I saw this error, was that there was a mismatch in the used
flink-state-processor-api version and other core Flink dependencies.
Could you confirm that?

Also, are you seeing this assertion error consistently, or only
occasionally?
cc'ing Seth, maybe he has other clues on the cause.

Cheers,
Gordon

On Fri, May 8, 2020 at 3:06 PM luisfaamaral 
wrote:

> No one? :)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Rich Function Thread Safety

2020-05-10 Thread Tzu-Li (Gordon) Tai
As others have mentioned already, it is true that method calls on operators
(e.g. processing events and snapshotting state) will not concurrently
happen.

As for your findings in reading through the documentation, that might be a
hint that we could add a bit more explanation mentioning this.
Could you suggest where you'd probably expect to see this being mentioned,
based on your readt-hrough?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Statefun 2.0 questions

2020-05-10 Thread Tzu-Li (Gordon) Tai
Hi,

Correct me if I'm wrong, but from the discussion so far it seems like what
Wouter is looking for is an HTTP-based ingress / egress.

We have been thinking about this in the past. The specifics of the
implementation is still to be discussed, but to be able to ensure
exactly-once processing semantics, behind the scenes of an HTTP-based
ingress, external messages / response will still likely be routed through
durable messaging systems such as Kafka / Pulsar / etc.

Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238

2020-05-11 Thread Tzu-Li (Gordon) Tai
In that case, the most possible cause would be
https://issues.apache.org/jira/browse/FLINK-16313, which is included in
Flink 1.10.1 (to be released)

The release candidates for Flink 1.10.1 is currently ongoing, would it be
possible for you to try that out and see if the error still occurs?

On Mon, May 11, 2020 at 4:11 PM luisfaamaral 
wrote:

> Thanks Gordon and Seth for the reply.
>
> So.. the main project contains the below flink dependencies...
>
>
>
> And the state processor project contains the following:
> 1.9.0
>
>
>
> At the first sight I may say all the libraries match to 1.9.0 flink
> libraries within both projects.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How To subscribe a Kinesis Stream using enhance fanout?

2020-05-14 Thread Tzu-Li (Gordon) Tai
Hi Xiaolong,

You are right, the way the Kinesis connector is implemented / the way the
AWS APIs are used, does not allow it to consume Kinesis streams with
enhanced fan-out enabled consumers [1].
Could you open a JIRA ticket for this?
As far as I can tell, this could be a valuable contribution to the
connector for Kinesis users who require dedicated throughput isolated from
other running consumers.

Cheers,
Gordon

[1]
https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html

On Wed, May 13, 2020 at 1:44 PM Xiaolong Wang 
wrote:

> Hello Flink Community!
>
>   I'm currently coding on a project relying on AWS Kinesis. With the
> provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the
> message.
>
>  But as the main stream is used among several other teams, I was
> required to use the enhance fanout of Kinesis. I checked the connector code
> and found no implementations.
>
>  Has this issue occurred to anyone before?
>
> Thanks for your help.
>


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Also, where do I put flint-conf.yaml in Idea to add additional required
> config parameter:
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
>
> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
> Hi,
> I am trying to run
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>  locally
> using
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>
> And have several questions.
> 1. It seems fairly straightforward to use it with in memory message
> generators, but I can’t figure out how to add Kafka ingress/Egress so that
> I can use it with Kafk
>
> Could you provide some context on why you would want to do that?

The StateFun Flink Harness was not intended to work with the usual shipped
ingress / egresses, but purely as a utility for users to run StateFun
applications in a consolidated local setup.
For testing against Kafka, I would suggest looking at how the StateFun
end-to-end tests do it, using testcontainers.
The tests are located under `statefun-e2e-tests` module.

If you still want to use the Flink Harness for this, you may be able to use
the withFlinkSourceFunction function to directly supply the Flink Kafka
connector.
This only works for the ingress side, though.

> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
> Harness. Is there a way to short circuit it and have Harness get
> StatefulFunctionUniverse directly
>
> That is not possible. The StatefulFunctionUniverse that the Harness
utility provides is always a "mock" one, which contains the defined
in-memory ingress and egresses.
As previously mentioned, that is because the Flink Harness was intended for
running StateFun applications without the need to interact with any other
external systems.

> 3. Is there an example on how to write Flink main for stageful function?
>
> At the moment, it is not possible to directly integrate Flink APIs and
Stateful Functions APIs in a single job.
What do you have in mind for what you want to achieve?

> 4. Is there an example anywhere on how to run such examples in the IDE
> with Kafka?
>
> The tests in `statefun-e2e-tests` can be run in the IDE and tests against
Kafka. It does require Docker to be available though.

> 5 There is a great stateful functions example
> https://github.com/ververica/flink-statefun-workshop, but its readme does
> not really describe implementation and neither does this article,
> referencing it
> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39. Is
> there anything that describes this implementation?
>
> I think the bottom half of the article provides some details of the
example, including the messaging between functions and a rough sketch of
the functions. Maybe its not detailed enough?
In particular, what parts of the example would you want to have more
details on?

Cheers,
Gordon


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Sorry, forgot to cc user@ as well in the last reply.

On Fri, May 22, 2020 at 12:01 PM Tzu-Li (Gordon) Tai 
wrote:

> As an extra note, the utilities you will find in `statefun-e2e-tests`,
> such as the `StatefulFunctionsAppsContainers` is not yet intended for users.
> This however was previously discussed before. Would be great to hear
> feedback from you on how it works for you if you do decide to give that a
> try.
>
> On Fri, May 22, 2020 at 11:58 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>>> Also, where do I put flint-conf.yaml in Idea to add additional required
>>> config parameter:
>>>
>>> classloader.parent-first-patterns.additional: 
>>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>>
>>>
>>>
>>> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
>>> boris.lublin...@lightbend.com> wrote:
>>>
>>> Hi,
>>> I am trying to run
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>>  locally
>>> using
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>>
>>> And have several questions.
>>> 1. It seems fairly straightforward to use it with in memory message
>>> generators, but I can’t figure out how to add Kafka ingress/Egress so that
>>> I can use it with Kafk
>>>
>>> Could you provide some context on why you would want to do that?
>>
>> The StateFun Flink Harness was not intended to work with the usual
>> shipped ingress / egresses, but purely as a utility for users to run
>> StateFun applications in a consolidated local setup.
>> For testing against Kafka, I would suggest looking at how the StateFun
>> end-to-end tests do it, using testcontainers.
>> The tests are located under `statefun-e2e-tests` module.
>>
>> If you still want to use the Flink Harness for this, you may be able to
>> use the withFlinkSourceFunction function to directly supply the Flink Kafka
>> connector.
>> This only works for the ingress side, though.
>>
>>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
>>> Harness. Is there a way to short circuit it and have Harness get
>>> StatefulFunctionUniverse directly
>>>
>>> That is not possible. The StatefulFunctionUniverse that the Harness
>> utility provides is always a "mock" one, which contains the defined
>> in-memory ingress and egresses.
>> As previously mentioned, that is because the Flink Harness was intended
>> for running StateFun applications without the need to interact with any
>> other external systems.
>>
>>> 3. Is there an example on how to write Flink main for stageful function?
>>>
>>> At the moment, it is not possible to directly integrate Flink APIs and
>> Stateful Functions APIs in a single job.
>> What do you have in mind for what you want to achieve?
>>
>>> 4. Is there an example anywhere on how to run such examples in the IDE
>>> with Kafka?
>>>
>>> The tests in `statefun-e2e-tests` can be run in the IDE and tests
>> against Kafka. It does require Docker to be available though.
>>
>>> 5 There is a great stateful functions example
>>> https://github.com/ververica/flink-statefun-workshop, but its readme
>>> does not really describe implementation and neither does this article,
>>> referencing it
>>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39.
>>> Is there anything that describes this implementation?
>>>
>>> I think the bottom half of the article provides some details of the
>> example, including the messaging between functions and a rough sketch of
>> the functions. Maybe its not detailed enough?
>> In particular, what parts of the example would you want to have more
>> details on?
>>
>> Cheers,
>> Gordon
>>
>>
>


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Are you getting an exception from running the Harness?
The Harness should already have the required configurations, such as the
parent first classloading config.

Otherwise, if you would like to add your own configuration, use the
`withConfiguration` method on the `Harness` class.

On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Also, where do I put flint-conf.yaml in Idea to add additional required
> config parameter:
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
>
> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
> Hi,
> I am trying to run
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>  locally
> using
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>
> And have several questions.
> 1. It seems fairly straightforward to use it with in memory message
> generators, but I can’t figure out how to add Kafka ingress/Egress so that
> I can use it with Kafk
> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
> Harness. Is there a way to short circuit it and have Harness get
> StatefulFunctionUniverse directly
> 3. Is there an example on how to write Flink main for stageful function?
> 4. Is there an example anywhere on how to run such examples in the IDE
> with Kafka?
> 5 There is a great stateful functions example
> https://github.com/ververica/flink-statefun-workshop, but its readme does
> not really describe implementation and neither does this article,
> referencing it
> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39. Is
> there anything that describes this implementation?
>
>
>


<    1   2   3   4   5   6   >