Re: How to to in Flink to support below HIVE SQL

2020-04-19 Thread Rui Li
Hey Xiaohua & Jark,

I'm sorry for overlooking the email. Adding to Jark's answers:

DISTRIBUTE BY => the functionality and syntax are not supported. We can
consider this as a candidate feature for 1.12.
named_struct => you should be able to call this function with Hive module
LATERAL VIEW => the syntax is not supported. As Jark mentioned, you can
rewrite the SQL to achieve the same functionalities
row format => defining row formate in DDL will be supported in FLIP-123
delimited fields => defining field delimiter in DDL will be supported in
FLIP-123
STR_TO_MAP => you should be able to call this function with Hive module,
but there's a known issue with this function[1]
Array => you should be able to call this function with Hive module

Feel free to raise questions if anything is still unclear or if you hit any
issues with these features.

[1] https://issues.apache.org/jira/browse/FLINK-16732


On Thu, Apr 9, 2020 at 12:04 PM Jark Wu  wrote:

> Hi Xiaohua,
>
> I'm not very familiar with Hive SQL, I will try to answer some of them:
>
> COALESCE => there is also a COALESCE built-in function in Flink [1]. From
> the documentation, I think they are identical.
> STR_TO_MAP =>  there is also a STR_TO_MAP built-in function in Flink blink
> planner[1]. But the default delimiter is different from Hive's.
> OVERWRITE => Blink planner supports INSERT OVERWRITE [2].
> FULL OUTER JOIN => Blink planner also supports this both streaming mode
> and batch mode.
> Rlike => Blink planner has REGEXP [1] built-in function which I think is
> similar to Hive's Rlike?
> LATERAL VIEW => This is called UDTF in Flink, see how to use UDTF in docs
> [3] "Join with Table Function (UDTF)"
>
> I cc'ed Rui Li who is working on FLIP-123 "DDL and DML compatibility for
> Hive", he may have more insights on this and please correct me if I give a
> wrong answer above.
>
> Best,
> Jark
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/systemFunctions.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/insert.html#insert-from-select-queries
> [3]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#joins
>
> On Thu, 9 Apr 2020 at 11:23, Xiaohua  wrote:
>
>> Hi,
>>
>> We meet some issue when migrate from Hive/Spark to Flink, Could you please
>> help me?
>>
>> Below is HIVE SQL we used:
>>
>> DISTRIBUTE BY
>> named_struct
>> COALECE
>> LATERAL VIEW
>> row format
>> delimited fields
>> STR_TO_MAP
>> OVERWRITE
>> FULL OUTER JOIN
>> Rlike
>> Array
>>
>> How to do use Flink SQL?
>>
>> Thank you~
>>
>> BR
>> Xiaohua
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

-- 
Cheers,
Rui Li


Job manager URI rpc address:port

2020-04-19 Thread Som Lima
Hi,

After running

$ ./bin/start-cluster.sh

The following line of code defaults jobmanager  to localhost:6123

final  ExecutionEnvironment env = Environment.getExecutionEnvironment();

which is same on spark.

val spark =
SparkSession.builder.master(local[*]).appname("anapp").getOrCreate

However if I wish to run the servers on a different physical computer.
Then in Spark I can do it this way using the spark URI in my IDE.

Conf =  SparkConf().setMaster("spark://:").setAppName("anapp")

Can you please tell me the equivalent change to make so I can run my
servers and my IDE from different physical computers.


Re: Job manager URI rpc address:port

2020-04-19 Thread tison
You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
options before run the program or take a look at RemoteStreamEnvironment
which enables configuring host and port.

Best,
tison.


Som Lima  于2020年4月19日周日 下午5:58写道:

> Hi,
>
> After running
>
> $ ./bin/start-cluster.sh
>
> The following line of code defaults jobmanager  to localhost:6123
>
> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>
> which is same on spark.
>
> val spark =
> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>
> However if I wish to run the servers on a different physical computer.
> Then in Spark I can do it this way using the spark URI in my IDE.
>
> Conf =
> SparkConf().setMaster("spark://:").setAppName("anapp")
>
> Can you please tell me the equivalent change to make so I can run my
> servers and my IDE from different physical computers.
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Job manager URI rpc address:port

2020-04-19 Thread Som Lima
Thanks.
flink-conf.yaml does allow me to do what I need to do without making any
changes to client source code.

But
RemoteStreamEnvironment constructor  expects a jar file as the third
parameter also.

RemoteStreamEnvironment

(String

host,
int port, String

... jarFiles)
Creates a new RemoteStreamEnvironment that points to the master
(JobManager) described by the given host name and port.

On Sun, 19 Apr 2020, 11:02 tison,  wrote:

> You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
> options before run the program or take a look at RemoteStreamEnvironment
> which enables configuring host and port.
>
> Best,
> tison.
>
>
> Som Lima  于2020年4月19日周日 下午5:58写道:
>
>> Hi,
>>
>> After running
>>
>> $ ./bin/start-cluster.sh
>>
>> The following line of code defaults jobmanager  to localhost:6123
>>
>> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>>
>> which is same on spark.
>>
>> val spark =
>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>
>> However if I wish to run the servers on a different physical computer.
>> Then in Spark I can do it this way using the spark URI in my IDE.
>>
>> Conf =
>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>
>> Can you please tell me the equivalent change to make so I can run my
>> servers and my IDE from different physical computers.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>


Re: Job manager URI rpc address:port

2020-04-19 Thread Zahid Rahman
Hi Tison,

I think I may have found what I want in example 22.
https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration

I need to create Configuration object first as shown .

Also I think  flink-conf.yaml file may contain configuration for client
rather than  server. So before starting is irrelevant.
I am going to play around and see but if the Configuration class allows me
to set configuration programmatically and overrides the yaml file then that
would be great.



On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:

> Thanks.
> flink-conf.yaml does allow me to do what I need to do without making any
> changes to client source code.
>
> But
> RemoteStreamEnvironment constructor  expects a jar file as the third
> parameter also.
>
> RemoteStreamEnvironment
> 
> (String
> 
>  host,
> int port, String
> 
> ... jarFiles)
> Creates a new RemoteStreamEnvironment that points to the master
> (JobManager) described by the given host name and port.
>
> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>
>> You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
>> options before run the program or take a look at RemoteStreamEnvironment
>> which enables configuring host and port.
>>
>> Best,
>> tison.
>>
>>
>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>
>>> Hi,
>>>
>>> After running
>>>
>>> $ ./bin/start-cluster.sh
>>>
>>> The following line of code defaults jobmanager  to localhost:6123
>>>
>>> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>>>
>>> which is same on spark.
>>>
>>> val spark =
>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>
>>> However if I wish to run the servers on a different physical computer.
>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>
>>> Conf =
>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>
>>> Can you please tell me the equivalent change to make so I can run my
>>> servers and my IDE from different physical computers.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>


Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Hi Som,

You can take a look at flink on zeppelin, in zeppelin you can connect to a
remote flink cluster via a few configuration, and you don't need to worry
about the jars. Flink interpreter will ship necessary jars for you. Here's
a list of tutorials.

1) Get started https://link.medium.com/oppqD6dIg5
 2) Batch https://link.medium.com/3qumbwRIg5
 3) Streaming https://
link.medium.com/RBHa2lTIg5  4) Advanced
usage https://link.medium.com/CAekyoXIg5 


Zahid Rahman  于2020年4月19日周日 下午7:27写道:

> Hi Tison,
>
> I think I may have found what I want in example 22.
>
> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>
> I need to create Configuration object first as shown .
>
> Also I think  flink-conf.yaml file may contain configuration for client
> rather than  server. So before starting is irrelevant.
> I am going to play around and see but if the Configuration class allows me
> to set configuration programmatically and overrides the yaml file then that
> would be great.
>
>
>
> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>
>> Thanks.
>> flink-conf.yaml does allow me to do what I need to do without making any
>> changes to client source code.
>>
>> But
>> RemoteStreamEnvironment constructor  expects a jar file as the third
>> parameter also.
>>
>> RemoteStreamEnvironment
>> 
>> (String
>> 
>>  host,
>> int port, String
>> 
>> ... jarFiles)
>> Creates a new RemoteStreamEnvironment that points to the master
>> (JobManager) described by the given host name and port.
>>
>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>
>>> You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
>>> options before run the program or take a look at RemoteStreamEnvironment
>>> which enables configuring host and port.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>
 Hi,

 After running

 $ ./bin/start-cluster.sh

 The following line of code defaults jobmanager  to localhost:6123

 final  ExecutionEnvironment env = Environment.getExecutionEnvironment();

 which is same on spark.

 val spark =
 SparkSession.builder.master(local[*]).appname("anapp").getOrCreate

 However if I wish to run the servers on a different physical computer.
 Then in Spark I can do it this way using the spark URI in my IDE.

 Conf =
 SparkConf().setMaster("spark://:").setAppName("anapp")

 Can you please tell me the equivalent change to make so I can run my
 servers and my IDE from different physical computers.














-- 
Best Regards

Jeff Zhang


Modelling time for complex events generated out of simple ones

2020-04-19 Thread Salva Alcántara
My flink application generates output (complex) events based on the
processing of (simple) input events. The generated output events are to be
consumed by other external services. My application works using event-time
semantics, so I am bit in doubt regarding what should I use as the output
events' timestamp.

Should I use:

- the processing time at the moment of generating them?
- the event time (given by the watermark value)?
- both?

For my use case, I am using both for now. But maybe you can come up with
examples/justifications for each of the given options. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Modelling time for complex events generated out of simple ones

2020-04-19 Thread Yun Tang
Hi Salva

I think this depends on what the relationship between you output and input 
events. If the output ones are just simple wrapper of input ones, e.g. adding 
some simple properties or just read from one place and write to another place, 
I think the output events could hold time which is inherited from input ones. 
That is to say, event-time semantics might be more proper.
On the other hand, if the output events have more independent relationship with 
input ones, and those tasks in Flink TM could be treated as the event 
generator, I think you can make the time as the processing time when generating 
them.
I think there is no absolute rules and all depends on your actual scenarios.

Best
Yun Tang

From: Salva Alcántara 
Sent: Monday, April 20, 2020 2:03
To: user@flink.apache.org 
Subject: Modelling time for complex events generated out of simple ones

My flink application generates output (complex) events based on the
processing of (simple) input events. The generated output events are to be
consumed by other external services. My application works using event-time
semantics, so I am bit in doubt regarding what should I use as the output
events' timestamp.

Should I use:

- the processing time at the moment of generating them?
- the event time (given by the watermark value)?
- both?

For my use case, I am using both for now. But maybe you can come up with
examples/justifications for each of the given options.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Job manager URI rpc address:port

2020-04-19 Thread Som Lima
Thanks for the info and links.

I had a lot of problems I am not sure what I was doing wrong.

May be conflicts with setup from apache spark.  I think I may need to setup
users for each development.


Anyway I kept doing fresh installs about four altogether I think.

Everything works fine now
Including remote access  of zeppelin on machines across the local area
network.

Next step  setup remote clusters
 Wish me luck !







On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:

> Hi Som,
>
> You can take a look at flink on zeppelin, in zeppelin you can connect to a
> remote flink cluster via a few configuration, and you don't need to worry
> about the jars. Flink interpreter will ship necessary jars for you. Here's
> a list of tutorials.
>
> 1) Get started https://link.medium.com/oppqD6dIg5
>  2) Batch https://
> link.medium.com/3qumbwRIg5  3) Streaming
> https://link.medium.com/RBHa2lTIg5  4)
> Advanced usage https://link.medium.com/CAekyoXIg5
> 
>
>
> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>
>> Hi Tison,
>>
>> I think I may have found what I want in example 22.
>>
>> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>>
>> I need to create Configuration object first as shown .
>>
>> Also I think  flink-conf.yaml file may contain configuration for client
>> rather than  server. So before starting is irrelevant.
>> I am going to play around and see but if the Configuration class allows
>> me to set configuration programmatically and overrides the yaml file then
>> that would be great.
>>
>>
>>
>> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>>
>>> Thanks.
>>> flink-conf.yaml does allow me to do what I need to do without making any
>>> changes to client source code.
>>>
>>> But
>>> RemoteStreamEnvironment constructor  expects a jar file as the third
>>> parameter also.
>>>
>>> RemoteStreamEnvironment
>>> 
>>> (String
>>> 
>>>  host,
>>> int port, String
>>> 
>>> ... jarFiles)
>>> Creates a new RemoteStreamEnvironment that points to the master
>>> (JobManager) described by the given host name and port.
>>>
>>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>>
 You can change flink-conf.yaml "jobmanager.address" or
 "jobmanager.port" options before run the program or take a look at
 RemoteStreamEnvironment which enables configuring host and port.

 Best,
 tison.


 Som Lima  于2020年4月19日周日 下午5:58写道:

> Hi,
>
> After running
>
> $ ./bin/start-cluster.sh
>
> The following line of code defaults jobmanager  to localhost:6123
>
> final  ExecutionEnvironment env =
> Environment.getExecutionEnvironment();
>
> which is same on spark.
>
> val spark =
> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>
> However if I wish to run the servers on a different physical computer.
> Then in Spark I can do it this way using the spark URI in my IDE.
>
> Conf =
> SparkConf().setMaster("spark://:").setAppName("anapp")
>
> Can you please tell me the equivalent change to make so I can run my
> servers and my IDE from different physical computers.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Problem getting watermark right with event time

2020-04-19 Thread Sudan S
Hi,

I am having a problem getting watermark right. The setup is
- I have a Flink Job which reads from a Kafka topic, uses Protobuf
Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up
the value and finally returns the result.

The code is pasted below.

The problem here is, I'm not able to reach the sink. I am able to reach the
assignTimestamp when the timestamp arrives, but past that, neither process
function nor the sink function is getting invoked in spite of pumping
events regularly. I'm not able to figure out how to debug this issue.
Plz help.

public class StreamingJob {

public static void main(String[] args) throws Exception {

Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers",
"{bootstrap_servers}");
kafkaConsumerProps.setProperty("group.id", "{group_id}");


final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());
env.enableCheckpointing(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setMaxParallelism(5);
env.setParallelism(5);

SingleOutputStreamOperator texStream = env
.addSource(new FlinkKafkaConsumer011<>("auth", new
EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
SlidingEventTimeWindows window =
SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
texStream.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(Eventi.Event element) {
return element.getEventTime().getSeconds() * 1000;
}
}).keyBy(Eventi.Event::getEventTime).window(window).process(new
ProcessWindowFunction() {
@Override
public void process(Timestamp timestamp, Context context,
Iterable elements, Collector out) throws Exception {
int sum = 0;
for (Eventi.Event element : elements) {
sum++;
}
out.collect(sum);
}
}).print()

env.execute();
}
}

-- 
*"The information contained in this e-mail and any accompanying documents 
may contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if 
this message has been addressed to you in error, please immediately alert 
the sender by replying to this e-mail and then delete this message, 
including any attachments. Any dissemination, distribution or other use of 
the contents of this message by anyone other than the intended recipient is 
strictly prohibited. All messages sent to and from this e-mail address may 
be monitored as permitted by applicable law and regulations to ensure 
compliance with our internal policies and to protect our business."*


[ANNOUNCE] Weekly Community Update 2020/16

2020-04-19 Thread Konstantin Knauf
Dear community,

happy to share this (and last) week's community update after a short Easter
break. A lot has happened in the community in the meantime. Stateful
Functions 2.0.0 was released, the releases of Flink 1.10.1 and 1.9.3 are
around the corner, a couple of new FLIPs and blog posts...

...and, of course, Flink Forward 2020 Virtual is on next week Wed - Fri!

Flink Development
==

* [releases] Apache Flink Stateful Functions 2.0.0 is out. [1] Check out
Stephan's announcement blog post for an overview. [2]

* [releases] Dian Fu recently proposed releasing Flink 1.9.3 [3]. The only
remaining blocker was merged quickly and the first release candidate is out
already [4]

* [releases] There is only one blocker left for Flink 1.10.1 and we
expecting a first release candidate soon. [5]

* [sql] Zhenghua Gao has started a discussion on FLIP-71 to finish initial
end-to-end view support in Flink SQL. [6]

* [sql, hive] Rui Li has authored a FLIP(-123) to increase the
compatibility of Flink with Hive's SQL dialect. It proposes to add an
additional parser for the Hive dialect and to a support a limited set of
DDL and DML features of Hive, that Flink currently does not understand. The
vote has already passed. [7,8]

* [python] Xingbo has started a discussion support Cython for Python User
Defined Functions in the Table API. Quick discussion and vote has already
gone through. [9,10]

* [distribution] Aljoscha has initiated a discussion on releasing a "fat"
and "slim" Flink distribution going forward. The slim distribution would
only contain an absolute minimal set of dependencies (less than today),
while the fat distribution would contain many convenience dependencies like
connectors and formats. The goal is to improve the initial user
experience particularly for Table API/SQL users. Discussion still ongoing.
[11]

* [connectors] Dawid proposes (FLIP-124] to add a "Rich" version of
(De)SerializationSchema (adding open/close-method and a Collector). [12]
This will allow to emit an arbitrary number of records for a single source
record, initialization code as well as the registration of metrics in
serializers. Vote has already started. [13,14]

* [docs] Marta proposes to apply to Google's Season of Docs 2020. Season of
Docs brings together technical writers and Open Source projects to improve
their documentation. Marta is looking for (documentation) project ideas as
well as volunteers to mentor potential technical writers. [15]

* [development process] Yun Tang has enabled autolinks from Github commits
to Jirta tickets for all Apache Flink repositories. "FLINK-" in a
commit message will now automatically link to the corresponding Jira
ticket. Checkout https://github.com/apache/flink/commits/master. [16]

* [development process] Aljoscha reminds everyone to check that their email
address is correctly configured in Github so that it shows up in your
commits to Apache Flink instead of nore...@github.com. [17]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-Stateful-Functions-2-0-0-released-tp39963.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-9-3-tp40086.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-9-3-release-candidate-1-tp40441.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-tp38689.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-71-E2E-View-support-in-Flink-SQL-tp40059.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-123-DDL-and-DML-compatibility-for-Hive-connector-tp39633.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-123-DDL-and-DML-compatibility-for-Hive-connector-tp40183.html
[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-121-Support-Cython-Optimizing-Python-User-Defined-Function-tp39577.html
[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/RESULT-VOTE-FLIP-121-Support-Cython-Optimizing-Python-User-Defined-Function-tp40163.html
[10]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-fat-and-slim-Flink-distributions-tp40237.html
[11]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
[12]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-124-Add-open-close-and-Collector-to-De-SerializationSchema-tp39864.html
[13]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-124-Add-open-close-and-Collector-to-De-SerializationSchema-tp40318.html
[14] https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
[15]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/PROPOSAL-Google-Season-of-Docs-2020-td40264.html
[16]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Configuring-autolinks-to-Flink-JIRA-ticket

Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Som, Let us know when you have any problems

Som Lima  于2020年4月20日周一 上午2:31写道:

> Thanks for the info and links.
>
> I had a lot of problems I am not sure what I was doing wrong.
>
> May be conflicts with setup from apache spark.  I think I may need to
> setup users for each development.
>
>
> Anyway I kept doing fresh installs about four altogether I think.
>
> Everything works fine now
> Including remote access  of zeppelin on machines across the local area
> network.
>
> Next step  setup remote clusters
>  Wish me luck !
>
>
>
>
>
>
>
> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>
>> Hi Som,
>>
>> You can take a look at flink on zeppelin, in zeppelin you can connect to
>> a remote flink cluster via a few configuration, and you don't need to worry
>> about the jars. Flink interpreter will ship necessary jars for you. Here's
>> a list of tutorials.
>>
>> 1) Get started https://link.medium.com/oppqD6dIg5
>>  2) Batch https://
>> link.medium.com/3qumbwRIg5  3) Streaming
>> https://link.medium.com/RBHa2lTIg5  4)
>> Advanced usage https://link.medium.com/CAekyoXIg5
>> 
>>
>>
>> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>>
>>> Hi Tison,
>>>
>>> I think I may have found what I want in example 22.
>>>
>>> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>>>
>>> I need to create Configuration object first as shown .
>>>
>>> Also I think  flink-conf.yaml file may contain configuration for client
>>> rather than  server. So before starting is irrelevant.
>>> I am going to play around and see but if the Configuration class allows
>>> me to set configuration programmatically and overrides the yaml file then
>>> that would be great.
>>>
>>>
>>>
>>> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>>>
 Thanks.
 flink-conf.yaml does allow me to do what I need to do without making
 any changes to client source code.

 But
 RemoteStreamEnvironment constructor  expects a jar file as the third
 parameter also.

 RemoteStreamEnvironment
 
 (String
 
  host,
 int port, String
 
 ... jarFiles)
 Creates a new RemoteStreamEnvironment that points to the master
 (JobManager) described by the given host name and port.

 On Sun, 19 Apr 2020, 11:02 tison,  wrote:

> You can change flink-conf.yaml "jobmanager.address" or
> "jobmanager.port" options before run the program or take a look at
> RemoteStreamEnvironment which enables configuring host and port.
>
> Best,
> tison.
>
>
> Som Lima  于2020年4月19日周日 下午5:58写道:
>
>> Hi,
>>
>> After running
>>
>> $ ./bin/start-cluster.sh
>>
>> The following line of code defaults jobmanager  to localhost:6123
>>
>> final  ExecutionEnvironment env =
>> Environment.getExecutionEnvironment();
>>
>> which is same on spark.
>>
>> val spark =
>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>
>> However if I wish to run the servers on a different physical computer.
>> Then in Spark I can do it this way using the spark URI in my IDE.
>>
>> Conf =
>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>
>> Can you please tell me the equivalent change to make so I can run my
>> servers and my IDE from different physical computers.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: Job manager URI rpc address:port

2020-04-19 Thread Som Lima
I will thanks.  Once I had it set up and working.
I switched  my computers around from client to server to server to client.
With your excellent instructions I was able to do it in 5 .minutes

On Mon, 20 Apr 2020, 00:05 Jeff Zhang,  wrote:

> Som, Let us know when you have any problems
>
> Som Lima  于2020年4月20日周一 上午2:31写道:
>
>> Thanks for the info and links.
>>
>> I had a lot of problems I am not sure what I was doing wrong.
>>
>> May be conflicts with setup from apache spark.  I think I may need to
>> setup users for each development.
>>
>>
>> Anyway I kept doing fresh installs about four altogether I think.
>>
>> Everything works fine now
>> Including remote access  of zeppelin on machines across the local area
>> network.
>>
>> Next step  setup remote clusters
>>  Wish me luck !
>>
>>
>>
>>
>>
>>
>>
>> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>>
>>> Hi Som,
>>>
>>> You can take a look at flink on zeppelin, in zeppelin you can connect to
>>> a remote flink cluster via a few configuration, and you don't need to worry
>>> about the jars. Flink interpreter will ship necessary jars for you. Here's
>>> a list of tutorials.
>>>
>>> 1) Get started https://link.medium.com/oppqD6dIg5
>>>  2) Batch https://
>>> link.medium.com/3qumbwRIg5  3) Streaming
>>> https://link.medium.com/RBHa2lTIg5  4)
>>> Advanced usage https://link.medium.com/CAekyoXIg5
>>> 
>>>
>>>
>>> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>>>
 Hi Tison,

 I think I may have found what I want in example 22.

 https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration

 I need to create Configuration object first as shown .

 Also I think  flink-conf.yaml file may contain configuration for client
 rather than  server. So before starting is irrelevant.
 I am going to play around and see but if the Configuration class allows
 me to set configuration programmatically and overrides the yaml file then
 that would be great.



 On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:

> Thanks.
> flink-conf.yaml does allow me to do what I need to do without making
> any changes to client source code.
>
> But
> RemoteStreamEnvironment constructor  expects a jar file as the third
> parameter also.
>
> RemoteStreamEnvironment
> 
> (String
> 
>  host,
> int port, String
> 
> ... jarFiles)
> Creates a new RemoteStreamEnvironment that points to the master
> (JobManager) described by the given host name and port.
>
> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>
>> You can change flink-conf.yaml "jobmanager.address" or
>> "jobmanager.port" options before run the program or take a look at
>> RemoteStreamEnvironment which enables configuring host and port.
>>
>> Best,
>> tison.
>>
>>
>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>
>>> Hi,
>>>
>>> After running
>>>
>>> $ ./bin/start-cluster.sh
>>>
>>> The following line of code defaults jobmanager  to localhost:6123
>>>
>>> final  ExecutionEnvironment env =
>>> Environment.getExecutionEnvironment();
>>>
>>> which is same on spark.
>>>
>>> val spark =
>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>
>>> However if I wish to run the servers on a different physical
>>> computer.
>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>
>>> Conf =
>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>
>>> Can you please tell me the equivalent change to make so I can run my
>>> servers and my IDE from different physical computers.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Glad to hear that.

Som Lima  于2020年4月20日周一 上午8:08写道:

> I will thanks.  Once I had it set up and working.
> I switched  my computers around from client to server to server to client.
> With your excellent instructions I was able to do it in 5 .minutes
>
> On Mon, 20 Apr 2020, 00:05 Jeff Zhang,  wrote:
>
>> Som, Let us know when you have any problems
>>
>> Som Lima  于2020年4月20日周一 上午2:31写道:
>>
>>> Thanks for the info and links.
>>>
>>> I had a lot of problems I am not sure what I was doing wrong.
>>>
>>> May be conflicts with setup from apache spark.  I think I may need to
>>> setup users for each development.
>>>
>>>
>>> Anyway I kept doing fresh installs about four altogether I think.
>>>
>>> Everything works fine now
>>> Including remote access  of zeppelin on machines across the local area
>>> network.
>>>
>>> Next step  setup remote clusters
>>>  Wish me luck !
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>>>
 Hi Som,

 You can take a look at flink on zeppelin, in zeppelin you can connect
 to a remote flink cluster via a few configuration, and you don't need to
 worry about the jars. Flink interpreter will ship necessary jars for you.
 Here's a list of tutorials.

 1) Get started https://link.medium.com/oppqD6dIg5
  2) Batch https://
 link.medium.com/3qumbwRIg5  3)
 Streaming https://link.medium.com/RBHa2lTIg5
  4) Advanced usage https://
 link.medium.com/CAekyoXIg5 


 Zahid Rahman  于2020年4月19日周日 下午7:27写道:

> Hi Tison,
>
> I think I may have found what I want in example 22.
>
> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>
> I need to create Configuration object first as shown .
>
> Also I think  flink-conf.yaml file may contain configuration for
> client rather than  server. So before starting is irrelevant.
> I am going to play around and see but if the Configuration class
> allows me to set configuration programmatically and overrides the yaml 
> file
> then that would be great.
>
>
>
> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>
>> Thanks.
>> flink-conf.yaml does allow me to do what I need to do without making
>> any changes to client source code.
>>
>> But
>> RemoteStreamEnvironment constructor  expects a jar file as the third
>> parameter also.
>>
>> RemoteStreamEnvironment
>> 
>> (String
>> 
>>  host,
>> int port, String
>> 
>> ... jarFiles)
>> Creates a new RemoteStreamEnvironment that points to the master
>> (JobManager) described by the given host name and port.
>>
>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>
>>> You can change flink-conf.yaml "jobmanager.address" or
>>> "jobmanager.port" options before run the program or take a look at
>>> RemoteStreamEnvironment which enables configuring host and port.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>
 Hi,

 After running

 $ ./bin/start-cluster.sh

 The following line of code defaults jobmanager  to localhost:6123

 final  ExecutionEnvironment env =
 Environment.getExecutionEnvironment();

 which is same on spark.

 val spark =
 SparkSession.builder.master(local[*]).appname("anapp").getOrCreate

 However if I wish to run the servers on a different physical
 computer.
 Then in Spark I can do it this way using the spark URI in my IDE.

 Conf =
 SparkConf().setMaster("spark://:").setAppName("anapp")

 Can you please tell me the equivalent change to make so I can run
 my servers and my IDE from different physical computers.














 --
 Best Regards

 Jeff Zhang

>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re:Re: multi-sql checkpoint fail

2020-04-19 Thread forideal
Hi Tison, Jark Wu:


   Thanks for your reply !!!


   What's the statebackend are you using? Is it Heap statebackend?
   
rocksdb backend uses incremental checkpoint.


   Could you share the stack traces?
 I looked at the flame chart myself and found that it was stuck at the end 
of the window where the calculation took place.
 
   I found that checkpoint failed because of watermark. For the same operator, 
the difference between max watermark and min watermark is 30 minutes (my 
checkpoint interval is 10 minutes). This may be caused by the slow calculation 
of windows.




Best forideal










At 2020-04-18 21:51:13, "Jark Wu"  wrote:

Hi,


What's the statebackend are you using? Is it Heap statebackend?

Best,
Jark


On Sat, 18 Apr 2020 at 07:06, tison  wrote:

Hi,


Could you share the stack traces?


Best,
tison.




forideal  于2020年4月18日周六 上午12:33写道:


Hello friend

I have two SQL, checkpoint fails all the time. One task is to open a sliding 
window for an hour, and then another task consumes the output data of the 
previous task. There will be no problem with the two tasks submitted separately.
-- first Calculation
-- second Write the calculation to redis
-- first
insertintodw_access_logselecttime_key,query_nor,query_nor_counter,'1'asgroup_keyfrom(selectHOP_START(event_time_fake,interval'1'MINUTE,interval'60'MINUTE)astime_key,query_nor,count(1)asquery_nor_counterfrom(selectRED_JSON_VALUE(request,'$.query_nor')asquery_nor,RED_JSON_VALUE(request,'$.target')astarget,event_time_fakefrom(selectred_pb_parser(body,'request')asrequest,event_time_fakefromaccess_log_source))groupbyquery_nor,HOP(--
 sliding window size one hour, step one minute
event_time_fake,interval'1'MINUTE,interval'60'MINUTE))wherequery_nor_counter>100;--
 second
insertintodw_sink_access_logselect'fix_key'as`key`,get_json_value(query_nor,query_nor_counter)as`value`--
 agg_func
fromdw_access_loggroupbytumble(time_key_fake,interval'1'MINUTE),group_key
Article Link:https://zhuanlan.zhihu.com/p/132764573
Picture Link:
https://pic4.zhimg.com/80/v2-d3b1105b1419fef3ea6b9176085a5597_1440w.jpg 
https://pic3.zhimg.com/80/v2-b6ea7b4a8368c4bae03afb94c723bcca_1440w.jpg


Best, forideal








 

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-19 Thread Yang Wang
Hi tison,

I think i get your concerns and points.

Take both FLINK-13938[1] and FLINK-14964[2] into account, i will do in the
following steps.
* Enrich "-yt/--yarnship" to support HDFS directory
* Enrich "-yt/--yarnship" to specify local resource visibility. It is
"APPLICATION" by default. It could be also configured to "PUBLIC",
which means shared by all applications, or "PRIVATE" which means shared by
a same user.
* Add a new config option to control whether to optimize the
submission(default is false). When configured to true, Flink client will
try to filter the jars and files by name and size to avoid unnecessary
uploading.

A very rough submission command could be issued as following.
*./bin/flink run -m yarn-cluster -d -yt
hdfs://myhdfs/flink/release/flink-1.11:PUBLIC,hdfs://myhdfs/user/someone/mylib
\*
*-yD yarn.submission-optimization.enable=true
examples/streaming/WindowJoin.jar*

cc @Rong Rong , since you also help to review the old
PR of FLINK-13938, maybe you could also share some thoughts.


[1]. https://issues.apache.org/jira/browse/FLINK-13938
[2]. https://issues.apache.org/jira/browse/FLINK-14964


Best,
Yang



tison  于2020年4月18日周六 下午12:12写道:

> Hi Yang,
>
> Name filtering & schema special handling makes sense for me. We can enrich
> later if there is requirement without breaking interface.
>
> For #1, from my perspective your first proposal is
>
>   having an option specifies remote flink/lib, then we turn off auto
> uploading local flink/lib and register that path as local resources
>
> It seems we here add another special logic for handling one kind of
> things...what I propose is we do these two steps explicitly separated:
>
> 1. an option turns off auto uploading local flink/lib
> 2. a general option register remote files as local resources
>
> The rest thing here is that you propose we handle flink/lib as PUBLIC
> visibility while other files as APPLICATION visibility, whether a
> composite configuration or name filtering to special handle libs makes
> sense though.
>
> YarnClusterDescriptor already has a lot of special handling logics which
> introduce a number of config options and keys, which should
> have been configured in few of common options and validated at the runtime.
>
> Best,
> tison.
>
>
> Yang Wang  于2020年4月17日周五 下午11:42写道:
>
>> Hi tison,
>>
>> For #3, if you mean registering remote HDFS file as local resource, we
>> should make the "-yt/--yarnship"
>> to support remote directory. I think it is the right direction.
>>
>> For #1, if the users could ship remote directory, then they could also
>> specify like this
>> "-yt hdfs://hdpdev/flink/release/flink-1.x,
>> hdfs://hdpdev/user/someone/mylib". Do you mean we add an
>> option for whether trying to avoid unnecessary uploading? Maybe we could
>> filter by names and file size.
>> I think this is a good suggestion, and we do not need to introduce a new
>> config option "-ypl".
>>
>> For #2, for flink-dist, the #1 could already solve the problem. We do not
>> need to support remote schema.
>> It will confuse the users when we only support HDFS, not S3, OSS, etc.
>>
>>
>> Best,
>> Yang
>>
>> tison  于2020年4月17日周五 下午8:05写道:
>>
>>> Hi Yang,
>>>
>>> I agree that these two of works would benefit from single assignee. My
>>> concern is as below
>>>
>>> 1. Both share libs & remote flink dist/libs are remote ship files. I
>>> don't think we have to implement multiple codepath/configuration.
>>> 2. So, for concept clarification, there are
>>>   (1) an option to disable shipping local libs
>>>   (2) flink-dist supports multiple schema at least we said "hdfs://"
>>>   (3) an option for registering remote shipfiles with path & visibility.
>>> I think new configuration system helps.
>>>
>>> the reason we have to special handling (2) instead of including it in
>>> (3) is because when shipping flink-dist to TM container, we specially
>>> detect flink-dist. Of course we can merge it into general ship files and
>>> validate shipfiles finally contain flink-dist, which is an alternative.
>>>
>>> The *most important* difference is (1) and (3) which we don't have an
>>> option for only remote libs. Is this clarification satisfy your proposal?
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Till Rohrmann  于2020年4月17日周五 下午7:49写道:
>>>
 Hi Yang,

 from what I understand it sounds reasonable to me. Could you sync with
 Tison on FLINK-14964 on how to proceed. I'm not super deep into these
 issues but they seem to be somewhat related and Tison already did some
 implementation work.

 I'd say it be awesome if we could include this kind of improvement into
 the release.

 Cheers,
 Till

 On Thu, Apr 16, 2020 at 4:43 AM Yang Wang 
 wrote:

> Hi All, thanks a lot for reviving this discussion.
>
> I think we could unify the FLINK-13938 and FLINK-14964 since they have
> the similar
> purpose, avoid unnecessary uploading and downloading jars in YARN
> deployment.
> The differenc

Re: Modelling time for complex events generated out of simple ones

2020-04-19 Thread Salva Alcántara
In my case, the relationship between input and output events is that output
events are generated out of some rules based on input events. Essentially,
output events correspond to specific patterns / sequences of input events.
You can think of output events as detecting certain anomalies or abnormal
conditions. So I guess we are more in the second case you mention where the
Flink TM can be regarded as a generator and hence using the processing time
makes sense.

Indeed, I am using both the processing time and the event time watermark
value at the moment of generating the output events. I think both convey
useful information. In particular, the processing time looks as the logical
timestamp for the output events. However, although that would be an
exception, it might also happen that my flink app is processing old data at
some point. That is why I am also adding another timestamp with the current
event-time watermark value. This allows the consumer of the output events to
detect whether the output event corresponds to old data or not (by comparing
the difference between the processing time and event time timestamps, which
should in normal conditions be close to each other, except when processing
old data).

In the case of using both, what naming would you use for the two fields?
Something along the lines of event_time and processing_time seems to leak
implementation details of my app to the external services...



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink 1.10 Out of memory

2020-04-19 Thread Xintong Song
Hi Lasse,

>From what I understand, your problem is that JVM tries to fork some native
process (if you look at the exception stack the root exception is thrown
from a native method) but there's no enough memory for doing that. This
could happen when either Mesos is using cgroup strict mode for memory
control, or there's no more memory on the machine. Flink cannot prevent
native processes from using more memory. It can only reserve certain amount
of memory for such native usage when requesting worker memory from the
deployment environment (in your case Mesos) and allocating Java heap /
direct memory.

My suggestion is to try increasing the JVM overhead configuration. You can
leverage the configuration options
'taskmanager.memory.jvm-overhead.[min|max|fraction]'. See more details in
the documentation[1].

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-jvm-overhead-max

On Sat, Apr 18, 2020 at 4:02 AM Zahid Rahman  wrote:

> https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <
> lassenedergaardfl...@gmail.com> wrote:
>
>> Hi.
>>
>> We have migrated to Flink 1.10 and face out of memory exception and
>> hopeful can someone point us in the right direction.
>>
>> We have a job that use broadcast state, and we sometimes get out memory
>> when it creates a savepoint. See stacktrack below.
>> We have assigned 2.2 GB/task manager and
>> configured  taskmanager.memory.process.size : 2200m
>> In Flink 1.9 our container was terminated because OOM, so 1.10 do a
>> better job, but it still not working and the task manager is leaking mem
>> for each OOM and finial kill by Mesos
>>
>>
>> Any idea what we can do to figure out what settings we need to change?
>>
>> Thanks in advance
>>
>> Lasse Nedergaard
>>
>>
>> WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
>> not close the state stream for
>> s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
>> java.io.IOException: Cannot allocate memory at
>> java.io.FileOutputStream.writeBytes(Native Method) at
>> java.io.FileOutputStream.write(FileOutputStream.java:326) at
>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at
>> java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at
>> java.io.FilterOutputStream.close(FilterOutputStream.java:158) at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
>> at
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> at
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
>> at
>> org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> at
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> at
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277)
>> at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at
>> org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at
>> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
>> at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167)
>> at
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
>> at
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>> Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c.
>> org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint
>> 3509 for operator Feature extraction (8/12). at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.T