Re: The Flink job recovered with wrong checkpoint state.

2020-06-15 Thread Thomas Huang
@Yun Tang<mailto:myas...@live.com>,Thanks.

From: Yun Tang 
Sent: Monday, June 15, 2020 11:30
To: Thomas Huang ; Flink 
Subject: Re: The Flink job recovered with wrong checkpoint state.

Hi Thomas

The answer is yes. Without high availability, once the job manager is down and 
even the job manager is relaunched via YARN, the job graph and last checkpoint 
would not be recovered.

Best
Yun Tang

From: Thomas Huang 
Sent: Sunday, June 14, 2020 22:58
To: Flink 
Subject: The Flink job recovered with wrong checkpoint state.

Hi Flink Community,

Currently, I'm using yarn-cluster mode to submit flink job on yarn, and I 
haven't set high availability configuration (zookeeper), but set restart 
strategy:

 env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3000))

the attempt time is 10 and the wait time 30 seconds per failure.

Today, when Infra team was rolling restart the yarn platform. Although the job 
manager restarted, the job hadn't recovered from the latest checkpoint, and all 
task managers started from the default job configuration that was not excepted.

Does it mean I have to setup high availability configuration for yarn-cluster 
mode, or Is there any bug?

Best Wish.




The Flink job recovered with wrong checkpoint state.

2020-06-14 Thread Thomas Huang
Hi Flink Community,

Currently, I'm using yarn-cluster mode to submit flink job on yarn, and I 
haven't set high availability configuration (zookeeper), but set restart 
strategy:

 env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3000))

the attempt time is 10 and the wait time 30 seconds per failure.

Today, when Infra team was rolling restart the yarn platform. Although the job 
manager restarted, the job hadn't recovered from the latest checkpoint, and all 
task managers started from the default job configuration that was not excepted.

Does it mean I have to setup high availability configuration for yarn-cluster 
mode, or Is there any bug?

Best Wish.




Re: Is it possible to change 'connector.startup-mode' option in the flink job

2020-05-18 Thread Thomas Huang
Hi Jingsong,

Cool, Thanks for your reply.


Best wishes.

From: Jingsong Li 
Sent: Tuesday, May 19, 2020 10:46
To: Thomas Huang 
Cc: Flink 
Subject: Re: Is it possible to change 'connector.startup-mode' option in the 
flink job

Hi Thomas,

Good to hear from you. This is a very common problem.
In 1.11, we have two FLIP to solve your problem. [1][2] You can take a look.
I think dynamic table options (table hints) is enough for your requirement.

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
[2]https://cwiki.apache.org/confluence/display/FLINK/FLIP-110%3A+Support+LIKE+clause+in+CREATE+TABLE

Best,
Jingsong Lee

On Tue, May 19, 2020 at 10:37 AM Thomas Huang 
mailto:lyang...@hotmail.com>> wrote:
Hi guys,

I'm using hive to store kafka topic metadata as follows::


CREATE TABLE orders (
user_idBIGINT,
productSTRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - '5' SECONDS
) WITH (
'connector.type' = 'kafka',
'connector.version'  = 'universal',
'connector.topic'= 'orders',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'json'
);

However, sometimes for incident handle, I have to set 'connector.startup-mode' 
to a specific timestamp, but I don't want to change the option setting in the 
hive table. Is there any way (api or ddl) to change  'connector.startup-mode'  
option in the flink jobs, but not impact the option stores in hive?


Best Wishes.


--
Best, Jingsong Lee


Is it possible to change 'connector.startup-mode' option in the flink job

2020-05-18 Thread Thomas Huang
Hi guys,

I'm using hive to store kafka topic metadata as follows::


CREATE TABLE orders (
user_idBIGINT,
productSTRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - '5' SECONDS
) WITH (
'connector.type' = 'kafka',
'connector.version'  = 'universal',
'connector.topic'= 'orders',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'json'
);

However, sometimes for incident handle, I have to set 'connector.startup-mode' 
to a specific timestamp, but I don't want to change the option setting in the 
hive table. Is there any way (api or ddl) to change  'connector.startup-mode'  
option in the flink jobs, but not impact the option stores in hive?


Best Wishes.


Re: Process available data and stop with savepoint

2020-05-18 Thread Thomas Huang
Hi,

Actually, seems like spark dynamic allocation  saves more resources in that 
case.


From: Arvid Heise 
Sent: Monday, May 18, 2020 11:15:09 PM
To: Congxian Qiu 
Cc: Sergii Mikhtoniuk ; user 
Subject: Re: Process available data and stop with savepoint

Hi Sergii,

your requirements feel a bit odd. It's neither batch nor streaming.

Could you tell us why it's not possible to let the job run as a streaming job 
that runs continuously? Is it just a matter of saving costs?
If so, you could monitor the number of records being processed and trigger 
stop/cancel-with-savepoint accordingly.

On Mon, May 18, 2020 at 7:19 AM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:
Hi Sergii

If I understand correctly, you want to process all the files in some directory, 
and do not want to process them multiple times. I'm not sure if using 
`FileProcessingMode#PROCESS_CONTINUOUSLY`
instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs, and 
keep the job running 7*24.

but be careful, under `FileProcessingMode#CONTINUOUSLY` mode, when a file is 
modified, its contents are re-processed entirely. This can break the 
“exactly-once” semantics, as appending data at the end of a file will lead to 
all its contents being re-processed.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#data-sources

Best,
Congxian


Sergii Mikhtoniuk mailto:mikhton...@gmail.com>> 
于2020年5月18日周一 上午5:47写道:
Hello,

I'm migrating my Spark-based stream processing application to Flink (Calcite 
SQL and temporal tables look too attractive to resist).

My Spark app works as follows:
- application is started periodically
- it reads a directory of Parquet files as a stream
- SQL transformations are applied
- resulting append stream is written to another directory
- it runs until all available data is processed
- checkpoints its state
- and **exits**
- upon next run it resumes where it left off, processing only new data

I'm having difficulties replicating this start-stop-resume behavior with Flink.

When I setup my input stream using:

env.readFile[Row](..., FileProcessingMode.PROCESS_CONTINUOUSLY)

... I get an infinite stream, but the application will naturally keep running 
until aborted manually.

When I use FileProcessingMode.PROCESS_ONCE - the application exits after 
exhausting all inputs, but it seems that Flink also treats the end of the 
stream as max watermark so, for example, it will close all tumbling windows 
that I don't want to be closed yet since more data will arrive upon next run.

Is there a way not to emit a max watermark with PROCESS_ONCE? If so, can I 
still trigger a savepoint when env.execute() returns?

Alternatively, if I use PROCESS_CONTINUOUSLY along with env.executeAsync() is 
there a way for me to detect when file stream was exhausted to call 
job.stopWithSavepoint()?

Thanks for your help!
- Sergii



--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng


Re: Incremental state with purging

2020-05-18 Thread Thomas Huang
I’m wondering that why you use a beta feature for production. Why not push the 
latest state into down sink like redis or hbase with Apache phoenix .


From: Annemarie Burger 
Sent: Monday, May 18, 2020 11:19:23 PM
To: user@flink.apache.org 
Subject: Re: Incremental state with purging

Hi,

Thanks for your suggestions!
However, as I'm reading the docs for queryable state, it says that it can
only be used for Processing time, and my windows are defined using event
time. So, I guess this means I should use the KeyedProcessFunction. Could
you maybe suggest a rough implementation for this? I can't seem to get the
implementation working right.

Best,
Annemarie



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Prometheus Pushgateway Reporter Can not DELETE metrics on pushgateway

2020-05-12 Thread Thomas Huang
I met this issue three months ago. Finally, we got the conclusion that is 
Prometheus push gateway can not handle high throughout metric data. But we 
solved the issue via service discovery. We changed the Prometheus metric 
reporter code, adding the registration logic, so the job can expose the host 
and port on discovery service. And then write a plugin for Prometheus that can 
get the service list to pull the metrics from the Flink jobs.


From: 李佳宸 
Sent: Wednesday, May 13, 2020 11:26:26 AM
To: user@flink.apache.org 
Subject: Prometheus Pushgateway Reporter Can not DELETE metrics on pushgateway

Hi,

I got stuck in using Prometheus,Pushgateway to collect metrics. Here is my 
configuration about reporter:

metrics.reporter.promgateway.class: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: true

And the version information:
Flink 1.9.1
Prometheus 2.18
PushGateway 1.2 & 0.9 (I had already try them both)

I found that when Flink cluster restart, there showed up metrics which have new 
jobName with random suffix. But there still existed those metrics having 
jobName before restarting cluster(value stop update). Since Prometheus still 
periodically pulled the data in pushgateway, I got a bunch of time series data 
with value unchanged forever.

It looks like:


# HELP flink_jobmanager_Status_JVM_CPU_Load Load (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Load gauge
flink_jobmanager_Status_JVM_CPU_Load{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 0
flink_jobmanager_Status_JVM_CPU_Load{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 0.0006602344673593189
# HELP flink_jobmanager_Status_JVM_CPU_Time Time (scope: 
jobmanager_Status_JVM_CPU)
# TYPE flink_jobmanager_Status_JVM_CPU_Time gauge
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 4.54512e+09
flink_jobmanager_Status_JVM_CPU_Time{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 8.24809e+09
# HELP flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded ClassesLoaded 
(scope: jobmanager_Status_JVM_ClassLoader)
# TYPE flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded gauge
flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 5984
flink_jobmanager_Status_JVM_ClassLoader_ClassesLoaded{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 6014
# HELP flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded ClassesUnloaded 
(scope: jobmanager_Status_JVM_ClassLoader)
# TYPE flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded gauge
flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded{host="localhost",instance="",job="myJobae71620b106e8c2fdf86cb5c65fd6414"}
 0
flink_jobmanager_Status_JVM_ClassLoader_ClassesUnloaded{host="localhost",instance="",job="myJobe50caa3be194aeb2ff71a64bced17cea"}
 0

Ps: This cluster has one JobManager.

In my understanding, when I set metrics.reporter.promgateway.deleteOnShutdown 
to true, the old metrics information should be deleted from pushgateway. But it 
didn’t work somehow.
Is my understanding on these configuration right? Any solution about deleting 
metrics from pushgateway?

Thanks!