Re: Flink 'Job Cluster' mode Ui Access

2019-11-27 Thread vino yang
Hi Jatin,

Flink web UI does not depend on any deployment mode.

You should check if there are error logs in the log file and the job status
is running state.

Best,
Vino

Jatin Banger  于2019年11月28日周四 下午3:43写道:

> Hi,
>
> It seems there is Web Ui for Flink Session cluster, But for Flink Job
> Cluster it is Showing
>
> {"errors":["Not found."]}
>
> Is it the expected behavior for Flink Job Cluster Mode ?
>
> Best Regards,
> Jatin
>


Flink 'Job Cluster' mode Ui Access

2019-11-27 Thread Jatin Banger
Hi,

It seems there is Web Ui for Flink Session cluster, But for Flink Job
Cluster it is Showing

{"errors":["Not found."]}

Is it the expected behavior for Flink Job Cluster Mode ?

Best Regards,
Jatin


?????? JobGraphs not cleaned up in HA mode

2019-11-27 Thread ??????
the config  (/flink is the NASdirectory ):  


jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 16
web.upload.dir: /flink/webUpload
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
high-availability: zookeeper
high-availability.cluster-id: /cluster-test
high-availability.storageDir: /flink/ha
high-availability.zookeeper.quorum: :2181
high-availability.jobmanager.port: 6123
high-availability.zookeeper.path.root: /flink/risk-insight
high-availability.zookeeper.path.checkpoints: /flink/zk-checkpoints
state.backend: filesystem
state.checkpoints.dir: file:///flink/checkpoints
state.savepoints.dir: file:///flink/savepoints
state.checkpoints.num-retained: 2
jobmanager.execution.failover-strategy: region
jobmanager.archive.fs.dir: file:///flink/archive/history

 




--  --
??: "Vijay Bhaskar"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

?????? JobGraphs not cleaned up in HA mode

2019-11-27 Thread ??????
if i clean the zookeeper data , it runs fine .  but next time when the 
jobmanager failed and redeploy the error occurs again








--  --
??: "Vijay Bhaskar"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: JobGraphs not cleaned up in HA mode

2019-11-27 Thread Vijay Bhaskar
Can you share the flink configuration once?

Regards
Bhaskar

On Thu, Nov 28, 2019 at 12:09 PM 曾祥才  wrote:

> if i clean the zookeeper data , it runs fine .  but next time when the
> jobmanager failed and redeploy the error occurs again
>
>
>
>
> -- 原始邮件 --
> *发件人:* "Vijay Bhaskar";
> *发送时间:* 2019年11月28日(星期四) 下午3:05
> *收件人:* "曾祥才";
> *主题:* Re: JobGraphs not cleaned up in HA mode
>
> Again it could not find the state store file: "Caused by:
> java.io.FileNotFoundException: /flink/ha/submittedJobGraph0c6bcff01199 "
>  Check why its unable to find.
> Better thing is: Clean up zookeeper state and check your configurations,
> correct them and restart cluster.
> Otherwise it always picks up corrupted state from zookeeper and it will
> never restart
>
> Regards
> Bhaskar
>
> On Thu, Nov 28, 2019 at 11:51 AM 曾祥才  wrote:
>
>> i've made a misstake( the log before is another cluster) . the full
>> exception log is :
>>
>>
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Recovering all persisted jobs.
>> 2019-11-28 02:33:12,726 INFO
>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>> Starting the SlotManager.
>> 2019-11-28 02:33:12,743 INFO
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> Released locks of job graph 639170a9d710bacfd113ca66b2aacefa from
>> ZooKeeper.
>> 2019-11-28 02:33:12,744 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
>> occurred in the cluster entrypoint.
>> org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
>> leadership with session id 607e1fe0-f1b4-4af0-af16-4137d779defb.
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915)
>> at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>> at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> at
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:594)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.RuntimeException:
>> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph
>> from state handle under /639170a9d710bacfd113ca66b2aacefa. This indicates
>> that the retrieved state handle is broken. Try cleaning the state handle
>> store.
>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>> ... 7 more
>> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
>> submitted JobGraph from state handle under
>> /639170a9d710bacfd113ca66b2aacefa. This indicates that the retrieved state
>> handle is broken. Try cleaning the state handle store.
>> at
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:190)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:755)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:740)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:721)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$27(Dispatcher.java:888)
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
>> ... 9 more
>> Caused by: java.io.FileNotFoundException:
>> /flink/ha/submittedJobGraph0c6bcff01199 (No such file or directory)
>> at java.io.FileInputStream.open0(Native Method)
>> at java.io.FileInputStream.open(FileInputStream.java:195)
>>
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Vijay Bhaskar";
>> *发送时间:* 2019年11月28日(星期四) 下午2:46
>> *收件人:* "曾祥才";
>> *抄送:* "User-Flink";
>> *主题:* Re: JobGraphs not cleaned up in HA mode
>>
>> Is it filesystem or hadoop? If its NAS then why the exception "Caused by:
>> org.apache.hadoop.hdfs.BlockMissingException: "
>> It seems you configured hadoop state store and giving NAS mount.
>>
>> Reg

Re: JobGraphs not cleaned up in HA mode

2019-11-27 Thread Vijay Bhaskar
Is it filesystem or hadoop? If its NAS then why the exception "Caused by:
org.apache.hadoop.hdfs.BlockMissingException: "
It seems you configured hadoop state store and giving NAS mount.

Regards
Bhaskar



On Thu, Nov 28, 2019 at 11:36 AM 曾祥才  wrote:

> /flink/checkpoints  is a external persistent store (a nas directory mounts
> to the job manager)
>
>
>
>
> -- 原始邮件 --
> *发件人:* "Vijay Bhaskar";
> *发送时间:* 2019年11月28日(星期四) 下午2:29
> *收件人:* "曾祥才";
> *抄送:* "user";
> *主题:* Re: JobGraphs not cleaned up in HA mode
>
> Following are the mandatory condition to run in HA:
>
> a) You should have persistent common external store for jobmanager and
> task managers to while writing the state
> b) You should have persistent external store for zookeeper to store the
> Jobgraph.
>
> Zookeeper is referring  path:
> /flink/checkpoints/submittedJobGraph480ddf9572ed  to get the job graph but
> jobmanager unable to find it.
> It seems /flink/checkpoints  is not the external persistent store
>
>
> Regards
> Bhaskar
>
> On Thu, Nov 28, 2019 at 10:43 AM seuzxc  wrote:
>
>> hi ,I've the same problem with flink 1.9.1 , any solution to fix it
>> when the k8s redoploy jobmanager ,  the error looks like (seems zk not
>> remove submitted job info, but jobmanager remove the file):
>>
>>
>> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
>> submitted JobGraph from state handle under
>> /147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
>> handle is broken. Try cleaning the state handle store.
>> at
>>
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
>> at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
>> at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
>> at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:662)
>> at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:821)
>> at
>>
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:72)
>> ... 9 more
>> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
>> block: BP-1651346363-10.20.1.81-1525354906737:blk_1083182315_9441494
>> file=/flink/checkpoints/submittedJobGraph480ddf9572ed
>> at
>>
>> org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:1052)
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


?????? JobGraphs not cleaned up in HA mode

2019-11-27 Thread ??????
/flink/checkpoints  is a external persistent store (a nas directory 
mounts to the job manager)








--  --
??: "Vijay Bhaskar"http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: JobGraphs not cleaned up in HA mode

2019-11-27 Thread Vijay Bhaskar
Following are the mandatory condition to run in HA:

a) You should have persistent common external store for jobmanager and task
managers to while writing the state
b) You should have persistent external store for zookeeper to store the
Jobgraph.

Zookeeper is referring  path:
/flink/checkpoints/submittedJobGraph480ddf9572ed  to get the job graph but
jobmanager unable to find it.
It seems /flink/checkpoints  is not the external persistent store


Regards
Bhaskar

On Thu, Nov 28, 2019 at 10:43 AM seuzxc  wrote:

> hi ,I've the same problem with flink 1.9.1 , any solution to fix it
> when the k8s redoploy jobmanager ,  the error looks like (seems zk not
> remove submitted job info, but jobmanager remove the file):
>
>
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
> submitted JobGraph from state handle under
> /147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
> handle is broken. Try cleaning the state handle store.
> at
>
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:662)
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:821)
> at
>
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:72)
> ... 9 more
> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
> block: BP-1651346363-10.20.1.81-1525354906737:blk_1083182315_9441494
> file=/flink/checkpoints/submittedJobGraph480ddf9572ed
> at
>
> org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:1052)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: JobGraphs not cleaned up in HA mode

2019-11-27 Thread seuzxc
hi ,I've the same problem with flink 1.9.1 , any solution to fix it
when the k8s redoploy jobmanager ,  the error looks like (seems zk not
remove submitted job info, but jobmanager remove the file):  


Caused by: org.apache.flink.util.FlinkException: Could not retrieve
submitted JobGraph from state handle under
/147dd022ec91f7381ad4ca3d290387e9. This indicates that the retrieved state
handle is broken. Try cleaning the state handle store.
at
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:208)
at
org.apache.flink.runtime.dispatcher.Dispatcher.recoverJob(Dispatcher.java:696)
at
org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobGraphs(Dispatcher.java:681)
at
org.apache.flink.runtime.dispatcher.Dispatcher.recoverJobs(Dispatcher.java:662)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:821)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:72)
... 9 more
Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain
block: BP-1651346363-10.20.1.81-1525354906737:blk_1083182315_9441494
file=/flink/checkpoints/submittedJobGraph480ddf9572ed
at
org.apache.hadoop.hdfs.DFSInputStream.refetchLocations(DFSInputStream.java:1052)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: What happens to the channels when there is backpressure?

2019-11-27 Thread yingjie cao
Hi Felipe,

That depends on what do you mean by 'bandwidth'. If you mean the capability
of the network stack, the answer would be no.

Here is a post about Flink network stack which may help:
https://flink.apache.org/2019/06/05/flink-network-stack.html.

Thanks,
Yingjie

Felipe Gutierrez  于2019年11月27日周三 下午11:13写道:

> Hi community,
>
> I have a question about backpressure. Suppose a scenario that I have a map
> and a reducer, and the reducer is back pressuring the map operator. I know
> that the reducer is processing tuples at a lower rate than it is receiving.
>
> However, can I say that at least one channel between the map and the
> reducer is totally using its available bandwidth?
>
> My guess is it is not, at least in the beginning. But as the time goes on
> the tuples will be queued in the network buffer of the reducer and then the
> bandwidth will be 100% of usage. Am I right?
>
> Thanks,
> Felipe
>
>
>


Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-27 Thread yingjie
Piotr is right, that depend on the data size you are reading and the memory
pressure. Those memory occupied by mmapped region can be recycled and used
by other processes if memory pressure is high, that is, other process or
service on the same node won't be affected because the OS will recycle the
mmapped pages if needed. But currently, you can't assume a bound of the
memory can be used, because it will use more memory as long as there is free
space and you have more new data to read.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Converting streaming to batch execution

2019-11-27 Thread Caizhi Weng
Hi Nick,

It seems to me that the slow part of the whole pipeline is the Derby sink.
Could you change it into other sinks (for example, csv sink or even a
"discard everything" sink) and see if the throughput improves?

If this is the case, are you using the JDBC connector? If yes, you might
consider calling the `JDBCOutputFormatBuilder#setBatchInterval` method and
make the batch interval to a larger value.

Thanks

Nicholas Walton  于2019年11月28日周四 上午1:57写道:

> Hi,
>
> I’ve been working with a pipleline that was initially aimed at processing
> high speed sensor data, but for a proof of concept I’m feeding simulated
> data from a CSV file. Each row of the file is a sample across a number of
> time series, and I’ve been using the streaming environment to process each
> column of the file in parallel. The columns are each around 6 million
> samples in length. The outputs are being sunk into a Derby database table.
>
> However, I’m not getting a very high throughput even running across two
> 32G Dell laptops with the database on a 16Mb Macbook. According to the
> metrics it seems I’m only dropping records on to the database at a rate of
> around 20-30 per second (I assume per parallel pipe of which I’m running 16
> across the two laptops). The pipeline runs a couple of windowing operations
> one of length 10 and one of length 100 with a small amount of computation,
> but it does yield a considerable amount of output a 10G CSV file yielding a
> database of around 100Gb+.
>
> I’m thinking that the slow rate is due to using stream process to process
> a batch. So I’ve I’ve been looking at the batch support in Flink (1.8)
> intending to move the code over from stream to batch execution. However, I
> can’t make head’n tail of the batch DataSet documentation. For example, in
> the streaming environment I was setting a watermark after I read each line
> of the file to keep the time series in order, and using keyBy to split up
> the individual time series by column number. I can’t find the equivalent
> operations in the batch interface.
>
> Could someone guide me to some relevant online documentation,and before
> anyone says I have chatted with Dr Google for a good while to no
> satisfactory outcome
>
> TIA
>
> Nick Walton


Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Tony Wei
Hi Piotrek,

There was already an issue [1] and PR for this thread. Should we mark it as
duplicated or related issue?

Best,
Tony Wei

[1] https://issues.apache.org/jira/browse/FLINK-10377

Piotr Nowojski  於 2019年11月28日 週四 上午12:17寫道:

> Hi Tony,
>
> Thanks for the explanation. Assuming that’s what’s happening, then I
> agree, this checkStyle should be removed. I created a ticket for this issue
> https://issues.apache.org/jira/browse/FLINK-14979
>
> Piotrek
>
> On 27 Nov 2019, at 16:28, Tony Wei  wrote:
>
> Hi Piotrek,
>
> The case here was that the first snapshot is a savepoint. I know that if
> the following checkpoint succeeded before the previous one, the previous
> one will be subsumed by JobManager. However, if that previous one is a
> savepoint, it won't be subsumed. That leads to the case that Chesney said.
> The following checkpoint succeeded before the previous savepoint, handling
> both of their pending transaction, but savepoint still succeeded and sent
> the notification to each TaskManager. That led to this exception. Could you
> double check if this is the case? Thank you.
>
> Best,
> Tony Wei
>
> Piotr Nowojski  於 2019年11月27日 週三 下午8:50 寫道:
>
>> Hi,
>>
>> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was
>> based on Pravega’s sink for Flink, which was implemented by Stephan, and it
>> has the same logic [1]. If I remember the discussions with Stephan/Till,
>> the way how Flink is using Akka probably guarantees that messages will be
>> always delivered, except of some failure, so `notifyCheckpointComplete`
>> could be missed probably only if a failure happens between snapshot and
>> arrival of the notification. Receiving the same notification twice should
>> be impossible (based on the knowledge passed to me from Till/Stephan).
>>
>> However, for one thing, if that’s possible, then the code should adjusted
>> accordingly. On the other hand, maybe there is no harm in relaxing the
>> contract? Even if we miss this notification (because of some re-ordering?),
>> next one will subsume the missed one and commit everything.
>>
>> Piotrek
>>
>> [1]
>> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>>
>> On 27 Nov 2019, at 13:02, Chesnay Schepler  wrote:
>>
>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict.
>> The notification for complete checkpoints is not reliable; it may be late,
>> not come at all, possibly even in different order than expected.
>>
>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify
>> the sink will always fail with an exception.
>>
>> What it should do imo is either a) don't check that there is a pending
>> transaction or b) track the highest checkpoint id received and optionally
>> don't fail if the notification is for an older CP.
>>
>> @piotr WDYT?
>>
>> On 27/11/2019 08:59, Tony Wei wrote:
>>
>> Hi,
>>
>> As the follow up, it seem that savepoint can't be subsumed, so that its
>> notification could still be send to each TMs.
>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>>
>> Best,
>> Tony Wei
>>
>> Tony Wei  於 2019年11月27日 週三 下午3:43寫道:
>>
>>> Hi,
>>>
>>> I want to raise this question again, since I have had this exception on
>>> my production job.
>>>
>>> The exception is as follows
>>>
>>>
 2019-11-27 14:47:29
>>>
>>>
>>>
>>> java.lang.RuntimeException: Error while confirming checkpoint at
 org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
 .java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:
 748) Caused by: java.lang.IllegalStateException: checkpoint completed,
 but no transaction pending at org.apache.flink.util.Preconditions
 .checkState(Preconditions.java:195) at
 org.apache.flink.streaming.api.functions.sink.
 TwoPhaseCommitSinkFunction.notifyCheckpointComplete(
 TwoPhaseCommitSinkFunction.java:267) at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
 .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at
 org.apache.flink.streaming.runtime.tasks.StreamTask
 .notifyCheckpointComplete(StreamTask.java:822) at
 org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
 ... 5 more
>>>
>>>
>>> And these are the checkpoint / savepoint before the job failed.
>>> 
>>>
>>> It seems that checkpoint # 675's notification handled the savepoint #
>>> 674's pending transaction holder, but savepoint #674's notification didn't
>>> be subsumed or be ignored by JM.
>>> Therefore, during the checkpoint #676, some tasks got notification
>>> before getting the che

Converting streaming to batch execution

2019-11-27 Thread Nicholas Walton
Hi,

I’ve been working with a pipleline that was initially aimed at processing high 
speed sensor data, but for a proof of concept I’m feeding simulated data from a 
CSV file. Each row of the file is a sample across a number of time series, and 
I’ve been using the streaming environment to process each column of the file in 
parallel. The columns are each around 6 million samples in length. The outputs 
are being sunk into a Derby database table.

However, I’m not getting a very high throughput even running across two 32G 
Dell laptops with the database on a 16Mb Macbook. According to the metrics it 
seems I’m only dropping records on to the database at a rate of around 20-30 
per second (I assume per parallel pipe of which I’m running 16 across the two 
laptops). The pipeline runs a couple of windowing operations one of length 10 
and one of length 100 with a small amount of computation, but it does yield a 
considerable amount of output a 10G CSV file yielding a database of around 
100Gb+.

I’m thinking that the slow rate is due to using stream process to process a 
batch. So I’ve I’ve been looking at the batch support in Flink (1.8) intending 
to move the code over from stream to batch execution. However, I can’t make 
head’n tail of the batch DataSet documentation. For example, in the streaming 
environment I was setting a watermark after I read each line of the file to 
keep the time series in order, and using keyBy to split up the individual time 
series by column number. I can’t find the equivalent operations in the batch 
interface.

Could someone guide me to some relevant online documentation,and before anyone 
says I have chatted with Dr Google for a good while to no satisfactory outcome

TIA

Nick Walton

Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

2019-11-27 Thread Gyula Fóra
You are right Aaron.

I would say this is like this by design as Flink doesn't require you to
initialize state in the open method so it has no safe way to delete the
non-referenced ones.

What you can do is restore the state and clear it on all operators and not
reference it again. I know this feels like a workaround but I have no
better idea at the moment.

Cheers,
Gyula

On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin  wrote:

> Hi,
>
> Yes, we're using UNION state. I would assume, though, that if you are
> not reading the UNION state it would either stop stick around as a
> constant factor in your state size, or get cleared.
>
> Looks like I should try to recreate a small example and submit a bug
> if this is true. Otherwise it's impossible to remove union state from
> your operators.
>
> On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu 
> wrote:
> >
> > Hi
> >
> > Do you use UNION state in your scenario, when using UNION state, then JM
> may encounter OOM because each TDD will contains all the state of all
> subtasks[1]
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
> > Best,
> > Congxian
> >
> >
> > Aaron Levin  于2019年11月27日周三 上午3:55写道:
> >>
> >> Hi,
> >>
> >> Some context: after a refactoring, we were unable to start our jobs.
> >> They started fine and checkpointed fine, but once the job restarted
> >> owing to a transient failure, the application was unable to start. The
> >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
> >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
> >> the `_metadata` file we saw `- 1402496 offsets:
> >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
> >> to be the operator state we were no longer initializing or
> >> snapshotting after the refactoring.
> >>
> >> Before I dig further into this and try to find a smaller reproducible
> >> test case I thought I would ask if someone knows what the expected
> >> behaviour is for the following scenario:
> >>
> >> suppose you have an operator (in this case a Source) which has some
> >> operator ListState. Suppose you run your flink job for some time and
> >> then later refactor your job such that you no longer use that state
> >> (so after the refactoring you're no longer initializing this operator
> >> state in initializeState, nor are you snapshotting the operator state
> >> in snapshotState). If you launch your new code from a recent
> >> savepoint, what do we expect to happen to the state? Do we anticipate
> >> the behaviour I explained above?
> >>
> >> My assumption would be that Flink would not read this state and so it
> >> would be removed from the next checkpoint or savepoint. Alternatively,
> >> I might assume it would not be read but would linger around every
> >> future checkpoint or savepoint. However, it feels like what is
> >> happening is it's not read and then possibly replicated by every
> >> instance of the task every time a checkpoint happens (hence the
> >> accidentally exponential behaviour).
> >>
> >> Thoughts?
> >>
> >> PS - in case someone asks: I was sure that we were calling `.clear()`
> >> appropriately in `snapshotState` (we, uh, already learned that lesson
> >> :D)
> >>
> >> Best,
> >>
> >> Aaron Levin
>


Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

2019-11-27 Thread Aaron Levin
Hi,

Yes, we're using UNION state. I would assume, though, that if you are
not reading the UNION state it would either stop stick around as a
constant factor in your state size, or get cleared.

Looks like I should try to recreate a small example and submit a bug
if this is true. Otherwise it's impossible to remove union state from
your operators.

On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu  wrote:
>
> Hi
>
> Do you use UNION state in your scenario, when using UNION state, then JM may 
> encounter OOM because each TDD will contains all the state of all subtasks[1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
> Best,
> Congxian
>
>
> Aaron Levin  于2019年11月27日周三 上午3:55写道:
>>
>> Hi,
>>
>> Some context: after a refactoring, we were unable to start our jobs.
>> They started fine and checkpointed fine, but once the job restarted
>> owing to a transient failure, the application was unable to start. The
>> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
>> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
>> the `_metadata` file we saw `- 1402496 offsets:
>> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
>> to be the operator state we were no longer initializing or
>> snapshotting after the refactoring.
>>
>> Before I dig further into this and try to find a smaller reproducible
>> test case I thought I would ask if someone knows what the expected
>> behaviour is for the following scenario:
>>
>> suppose you have an operator (in this case a Source) which has some
>> operator ListState. Suppose you run your flink job for some time and
>> then later refactor your job such that you no longer use that state
>> (so after the refactoring you're no longer initializing this operator
>> state in initializeState, nor are you snapshotting the operator state
>> in snapshotState). If you launch your new code from a recent
>> savepoint, what do we expect to happen to the state? Do we anticipate
>> the behaviour I explained above?
>>
>> My assumption would be that Flink would not read this state and so it
>> would be removed from the next checkpoint or savepoint. Alternatively,
>> I might assume it would not be read but would linger around every
>> future checkpoint or savepoint. However, it feels like what is
>> happening is it's not read and then possibly replicated by every
>> instance of the task every time a checkpoint happens (hence the
>> accidentally exponential behaviour).
>>
>> Thoughts?
>>
>> PS - in case someone asks: I was sure that we were calling `.clear()`
>> appropriately in `snapshotState` (we, uh, already learned that lesson
>> :D)
>>
>> Best,
>>
>> Aaron Levin


Re: Apache Flink - Throttling stream flow

2019-11-27 Thread Rong Rong
Hi Mans,

is this what you are looking for [1][2]?

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-11501
[2] https://github.com/apache/flink/pull/7679

On Mon, Nov 25, 2019 at 3:29 AM M Singh  wrote:

> Thanks Ciazhi & Thomas for your responses.
>
> I read the throttling example but want to see if that work with a
> distributed broker like Kinesis and how to have throttling feedback to the
> Kinesis source so that it can vary the rate without interfering with
> watermarks, etc.
>
> Thanks again
>
> Mans
>
>
> On Monday, November 25, 2019, 05:55:21 AM EST, Thomas Julian <
> thomasjul...@zoho.com> wrote:
>
>
> related
>
> https://issues.apache.org/jira/browse/FLINK-13792
>
> Regards,
> Julian.
>
>
>  On Mon, 25 Nov 2019 15:25:14 +0530 *Caizhi Weng
> >* wrote 
>
> Hi,
>
> As far as I know, Flink currently doesn't have a built-in throttling
> function. You can write your own user-defined function to achieve this.
> Your function just gives out what it reads in and limits the speed it gives
> out records at the same time.
>
> If you're not familiar with user-defined functions, see
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
>
> Here is a throttling iterator example:
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java
>
> M Singh  于2019年11月25日周一 上午5:50写道:
>
> Hi:
>
> I have an Flink streaming application that invokes  some other web
> services.  However the webservices have limited throughput.  So I wanted to
> find out if there any recommendations on how to throttle the Flink
> datastream so that they don't overload the downstrream services.  I am
> using Kinesis as source and sink in my application.
>
> Please let me know if there any hooks available in Flink, what are the
> patterns that can be used and what are the best practices/pitfalls for
> using them.
>
> Thanks
>
> Mans
>
>
>
>


Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Piotr Nowojski
Hi Tony,

Thanks for the explanation. Assuming that’s what’s happening, then I agree, 
this checkStyle should be removed. I created a ticket for this issue 
https://issues.apache.org/jira/browse/FLINK-14979 


Piotrek

> On 27 Nov 2019, at 16:28, Tony Wei  wrote:
> 
> Hi Piotrek,
> 
> The case here was that the first snapshot is a savepoint. I know that if the 
> following checkpoint succeeded before the previous one, the previous one will 
> be subsumed by JobManager. However, if that previous one is a savepoint, it 
> won't be subsumed. That leads to the case that Chesney said. The following 
> checkpoint succeeded before the previous savepoint, handling both of their 
> pending transaction, but savepoint still succeeded and sent the notification 
> to each TaskManager. That led to this exception. Could you double check if 
> this is the case? Thank you. 
> 
> Best,
> Tony Wei
> 
> Piotr Nowojski mailto:pi...@ververica.com>> 於 
> 2019年11月27日 週三 下午8:50 寫道:
> Hi,
> 
> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was based 
> on Pravega’s sink for Flink, which was implemented by Stephan, and it has the 
> same logic [1]. If I remember the discussions with Stephan/Till, the way how 
> Flink is using Akka probably guarantees that messages will be always 
> delivered, except of some failure, so `notifyCheckpointComplete` could be 
> missed probably only if a failure happens between snapshot and arrival of the 
> notification. Receiving the same notification twice should be impossible 
> (based on the knowledge passed to me from Till/Stephan).
> 
> However, for one thing, if that’s possible, then the code should adjusted 
> accordingly. On the other hand, maybe there is no harm in relaxing the 
> contract? Even if we miss this notification (because of some re-ordering?), 
> next one will subsume the missed one and commit everything. 
> 
> Piotrek
> 
> [1] 
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>  
> 
> 
>> On 27 Nov 2019, at 13:02, Chesnay Schepler > > wrote:
>> 
>> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. 
>> The notification for complete checkpoints is not reliable; it may be late, 
>> not come at all, possibly even in different order than expected.
>> 
>> As such, if you a simple case of snapshot -> snapshot -> notify -> notify 
>> the sink will always fail with an exception.
>> 
>> What it should do imo is either a) don't check that there is a pending 
>> transaction or b) track the highest checkpoint id received and optionally 
>> don't fail if the notification is for an older CP.
>> 
>> @piotr WDYT?
>> 
>> On 27/11/2019 08:59, Tony Wei wrote:
>>> Hi, 
>>> 
>>> As the follow up, it seem that savepoint can't be subsumed, so that its 
>>> notification could still be send to each TMs.
>>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>>> 
>>> Best,
>>> Tony Wei
>>> 
>>> Tony Wei mailto:tony19920...@gmail.com>> 於 
>>> 2019年11月27日 週三 下午3:43寫道:
>>> Hi, 
>>> 
>>> I want to raise this question again, since I have had this exception on my 
>>> production job.
>>> 
>>> The exception is as follows
>>>  
>>> 2019-11-27 14:47:29
>>>  
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no 
>>> transaction pending
>>> at 
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> at 
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>>> ... 5 more
>>> 
>>> And these are the checkpoint / savepoint before the job failed.
>>> 
>>> 
>>> It seems that checkpoint # 675's notification handled the savepoint # 674's 
>>> pending transaction holder, but savepoint #674's notification didn't be 
>>> subsumed or be ignored by JM.
>>> Therefore, during the checkpoint #676, s

Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Tony Wei
Hi Piotrek,

The case here was that the first snapshot is a savepoint. I know that if
the following checkpoint succeeded before the previous one, the previous
one will be subsumed by JobManager. However, if that previous one is a
savepoint, it won't be subsumed. That leads to the case that Chesney said.
The following checkpoint succeeded before the previous savepoint, handling
both of their pending transaction, but savepoint still succeeded and sent
the notification to each TaskManager. That led to this exception. Could you
double check if this is the case? Thank you.

Best,
Tony Wei

Piotr Nowojski  於 2019年11月27日 週三 下午8:50 寫道:

> Hi,
>
> Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was
> based on Pravega’s sink for Flink, which was implemented by Stephan, and it
> has the same logic [1]. If I remember the discussions with Stephan/Till,
> the way how Flink is using Akka probably guarantees that messages will be
> always delivered, except of some failure, so `notifyCheckpointComplete`
> could be missed probably only if a failure happens between snapshot and
> arrival of the notification. Receiving the same notification twice should
> be impossible (based on the knowledge passed to me from Till/Stephan).
>
> However, for one thing, if that’s possible, then the code should adjusted
> accordingly. On the other hand, maybe there is no harm in relaxing the
> contract? Even if we miss this notification (because of some re-ordering?),
> next one will subsume the missed one and commit everything.
>
> Piotrek
>
> [1]
> https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
>
> On 27 Nov 2019, at 13:02, Chesnay Schepler  wrote:
>
> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict.
> The notification for complete checkpoints is not reliable; it may be late,
> not come at all, possibly even in different order than expected.
>
> As such, if you a simple case of snapshot -> snapshot -> notify -> notify
> the sink will always fail with an exception.
>
> What it should do imo is either a) don't check that there is a pending
> transaction or b) track the highest checkpoint id received and optionally
> don't fail if the notification is for an older CP.
>
> @piotr WDYT?
>
> On 27/11/2019 08:59, Tony Wei wrote:
>
> Hi,
>
> As the follow up, it seem that savepoint can't be subsumed, so that its
> notification could still be send to each TMs.
> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>
> Best,
> Tony Wei
>
> Tony Wei  於 2019年11月27日 週三 下午3:43寫道:
>
>> Hi,
>>
>> I want to raise this question again, since I have had this exception on
>> my production job.
>>
>> The exception is as follows
>>
>>
>>> 2019-11-27 14:47:29
>>
>>
>>
>> java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
>>> .java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.IllegalStateException: checkpoint completed, but
>>> no transaction pending at org.apache.flink.util.Preconditions
>>> .checkState(Preconditions.java:195) at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
>>> .notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267) at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>>> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .notifyCheckpointComplete(StreamTask.java:822) at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200) ...
>>> 5 more
>>
>>
>> And these are the checkpoint / savepoint before the job failed.
>> 
>>
>> It seems that checkpoint # 675's notification handled the savepoint #
>> 674's pending transaction holder, but savepoint #674's notification didn't
>> be subsumed or be ignored by JM.
>> Therefore, during the checkpoint #676, some tasks got notification before
>> getting the checkpoint barrier and led to this exception happened, because
>> there was no pending transaction in queue.
>>
>> Does anyone know the details about subsumed notifications mechanism and
>> how checkpoint coordinator handle this situation? Please correct me if I'm
>> wrong. Thanks.
>>
>> Best,
>> Tony Wei
>>
>> Stefan Richter  於 2018年10月8日 週一 下午5:03寫道:
>>
>>> Hi Pedro,
>>>
>>> unfortunately the interesting parts are all removed from the log, we
>>> already know about the exception itself. In particular, what I would like
>>> to see is what checkpoints have been triggered and completed before the
>>> exception happens.
>>>
>>> Bes

What happens to the channels when there is backpressure?

2019-11-27 Thread Felipe Gutierrez
Hi community,

I have a question about backpressure. Suppose a scenario that I have a map
and a reducer, and the reducer is back pressuring the map operator. I know
that the reducer is processing tuples at a lower rate than it is receiving.

However, can I say that at least one channel between the map and the
reducer is totally using its available bandwidth?

My guess is it is not, at least in the beginning. But as the time goes on
the tuples will be queued in the network buffer of the reducer and then the
bandwidth will be 100% of usage. Am I right?

Thanks,
Felipe


[PROPOSAL/SURVEY] Enable background cleanup for state with TTL by default

2019-11-27 Thread Andrey Zagrebin
Hi all,

We were thinking about enabling background cleanup for the state with TTL
by default:
StateTtlConfig#Builder#cleanupInBackground()

Previously, we did not have it in the first implementation of TTL if you
remember.
So technically, we were a bit conservative to not enable it by default at
once.
In general, most of TTL use cases should always enable background cleanup
one way or another,
because it is usually needed. It means that it makes sense to enable it for
users to not care about it by default.

I am starting this thread to collect any feedback for this change which we
want to include into 1.10 release. Basically, the question is whether there
have been any problems with the background cleanups to postpone this change.

JIRA issue and PR:
https://issues.apache.org/jira/browse/FLINK-14898

I will let the thread to hang for some time if nothing speaks against it we
will merge it next week and include into 1.10.

Thanks,
Andrey


AW: ArrayIndexOutOfBoundException on checkpoint creation

2019-11-27 Thread theo.diefent...@scoop-software.de
Sorry, I forgot to mention the environment.
We use Flink 1.9.1 on a cloudera cdh6. 3.1 cluster (with Hadoop 3.0.0 but using 
Flink shaded 2.8.3-7. Might this be a problem? As it seems to arise from kryo, 
I doubt it)
Our flink is configured as default. Our job uses FsStateBackend and exactly 
once processing with Kafka source and sink.
Best regardsTheo
 Ursprüngliche Nachricht 
Betreff: ArrayIndexOutOfBoundException on checkpoint creation
Von: Theo Diefenthal
An: user
Cc:


Hi, 

We have a pipeline with a custom ProcessFunction and state (see [1], 
implemented as suggested by Fabian with a ValueState and 
ValueState>) 
The behavior of that function works fine in our unittests and with low load in 
our test environment (100.000 records per minute). On the production 
environment, we observe reproduceable crashes like the attached one. 
Any idea on why this out of bound could be caused? Every time we read the state 
and modify it, we are certain that an .update() was called: 

2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize 
checkpoint 7 for operator our_operator) (4/8). 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745) 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.util.concurrent.ExecutionException: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.get(FutureTask.java:192) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
 
2019-11-26T11:26:55+01:00 host19 ... 3 more 
2019-11-26T11:26:55+01:00 host19 Caused by: 
java.lang.ArrayIndexOutOfBoundsException: 67108864 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) 
2019-11-26T11:26:55+01:00 host19 at 
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
 
2019-11-26T11:26:55+01:00 host19 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) 
2019-11-26T11:26:55+01:00 host19 at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
 
2019-11-26T11:26:55+01:00 host19 ... 5 more 
2019-11-26T11:26:55+01:00 host18 WARN  org.apache.hadoop.hdfs.DataStreamer  
 - DataStreamer Exception 
2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not 
exist: 
/.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18
 (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have 
any open files. 
2019-11-26T11:26:55+01:00 host18 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesyste

Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Piotr Nowojski
Hi,

Maybe Chesney you are right, but I’m not sure. TwoPhaseCommitSink was based on 
Pravega’s sink for Flink, which was implemented by Stephan, and it has the same 
logic [1]. If I remember the discussions with Stephan/Till, the way how Flink 
is using Akka probably guarantees that messages will be always delivered, 
except of some failure, so `notifyCheckpointComplete` could be missed probably 
only if a failure happens between snapshot and arrival of the notification. 
Receiving the same notification twice should be impossible (based on the 
knowledge passed to me from Till/Stephan).

However, for one thing, if that’s possible, then the code should adjusted 
accordingly. On the other hand, maybe there is no harm in relaxing the 
contract? Even if we miss this notification (because of some re-ordering?), 
next one will subsume the missed one and commit everything. 

Piotrek

[1] 
https://github.com/pravega/flink-connectors/blob/master/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java#L567
 


> On 27 Nov 2019, at 13:02, Chesnay Schepler  wrote:
> 
> This looks to me like the TwoPhaseCommitSinkFunction is a bit too strict. The 
> notification for complete checkpoints is not reliable; it may be late, not 
> come at all, possibly even in different order than expected.
> 
> As such, if you a simple case of snapshot -> snapshot -> notify -> notify the 
> sink will always fail with an exception.
> 
> What it should do imo is either a) don't check that there is a pending 
> transaction or b) track the highest checkpoint id received and optionally 
> don't fail if the notification is for an older CP.
> 
> @piotr WDYT?
> 
> On 27/11/2019 08:59, Tony Wei wrote:
>> Hi, 
>> 
>> As the follow up, it seem that savepoint can't be subsumed, so that its 
>> notification could still be send to each TMs.
>> Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?
>> 
>> Best,
>> Tony Wei
>> 
>> Tony Wei mailto:tony19920...@gmail.com>> 於 
>> 2019年11月27日 週三 下午3:43寫道:
>> Hi, 
>> 
>> I want to raise this question again, since I have had this exception on my 
>> production job.
>> 
>> The exception is as follows
>>  
>> 2019-11-27 14:47:29
>>  
>> java.lang.RuntimeException: Error while confirming checkpoint
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
>> at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no 
>> transaction pending
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> at 
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>> ... 5 more
>> 
>> And these are the checkpoint / savepoint before the job failed.
>> 
>> 
>> It seems that checkpoint # 675's notification handled the savepoint # 674's 
>> pending transaction holder, but savepoint #674's notification didn't be 
>> subsumed or be ignored by JM.
>> Therefore, during the checkpoint #676, some tasks got notification before 
>> getting the checkpoint barrier and led to this exception happened, because 
>> there was no pending transaction in queue.
>> 
>> Does anyone know the details about subsumed notifications mechanism and how 
>> checkpoint coordinator handle this situation? Please correct me if I'm 
>> wrong. Thanks.
>> 
>> Best,
>> Tony Wei
>> 
>> Stefan Richter > > 於 2018年10月8日 週一 下午5:03寫道:
>> Hi Pedro,
>> 
>> unfortunately the interesting parts are all removed from the log, we already 
>> know about the exception itself. In particular, what I would like to see is 
>> what checkpoints have been triggered and completed before the exception 
>> happens.
>> 
>> Best,
>> Stefan
>> 
>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves > > >:
>> > 
>> > Hello,
>> > 
>> > Find attached the jobmanager.log. I've omitted the log lines from other
>> > runs, only left the job manager info and the run with the error. 
>> > 
>> > jobmanager.log
>> > > >  
>> > 

Re: Apache Flink - Troubleshooting exception while starting the job from savepoint

2019-11-27 Thread Congxian Qiu
Hi,

As the doc[1] said we should assign uid to all the stateful operators. If
you do not set uid for an operator, Flink will generate an operatorId for
it, AFAIK, operatorId will not change as far as the job DAG does not change.

you can skip the operator's state which is not in the new job, please ref
to doc[2], and theses operators will start from scratch.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#what-happens-if-i-delete-an-operator-that-has-state-from-my-job
Best,
Congxian


M Singh  于2019年11月26日周二 上午10:49写道:

> Hi Kostas/Congxian:
>
> Thanks fo your response.
>
> Based on your feedback, I found that I had missed adding uid to one of the
> stateful operators and correcting that resolved the issue.  I still have
> stateless operators which I have no uid specified in the application.
>
> So, I thought that adding uid was optional and if we don't add it and run
> another instance of the same app from a savepoint or checkpoint, it will
> pick up the state based on the generated uid.  Is that a correct
> understanding ?
>
> Also, if some stateful operators have uids but some don't, will it pick up
> the state for the operators with uid and the non-uid (using the generated
> uid) ones provided the application has not changed ?
>
> Thanks again for your response.
>
> Mans
>
> On Monday, November 25, 2019, 09:24:42 AM EST, Congxian Qiu <
> qcx978132...@gmail.com> wrote:
>
>
> Hi
>
> The problem is that the specified uid did not in the new job.
> 1. As far as I know, the answer is yes. There are some operators have
> their own state(such as window state), could you please share the minimal
> code of your job?
> 2.*truely* stateless operator do not need to have uid, but for the reason
> described in the above, assign uid to all operators is recommended.
> 3. if the previous job is still there, I'm not sure we can find the
> operatorId in the UI easily, maybe other people can answer the question.
> 4. for this purpose, maybe you can debug the savepoint meta with the new
> job locally, maybe CheckpointMetadataLoadingTest can help.
> 5. for this problem, 1.9 is same as 1.6
>
>
> Best,
> Congxian
>
>
> Kostas Kloudas  于2019年11月25日周一 下午9:42写道:
>
> As a side note, I am assuming that you are using the same Flink Job
> before and after the savepoint and the same Flink version.
> Am I correct?
>
> Cheers,
> Kostas
>
> On Mon, Nov 25, 2019 at 2:40 PM Kostas Kloudas  wrote:
> >
> > Hi Singh,
> >
> > This behaviour is strange.
> > One thing I can recommend to see if the two jobs are identical is to
> > launch also the second job without a savepoint,
> > just start from scratch, and simply look at the web interface to see
> > if everything is there.
> >
> > Also could you please provide some code from your job, just to see if
> > there is anything problematic with the application code?
> > Normally there should be no problem with not providing UIDs for some
> > stateless operators.
> >
> > Cheers,
> > Kostas
> >
> > On Sat, Nov 23, 2019 at 11:16 AM M Singh  wrote:
> > >
> > >
> > > Hey Folks:
> > >
> > > Please let me know how to resolve this issue since using
> --allowNonRestoredState without knowing if any state will be lost seems
> risky.
> > >
> > > Thanks
> > > On Friday, November 22, 2019, 02:55:09 PM EST, M Singh <
> mans2si...@yahoo.com> wrote:
> > >
> > >
> > > Hi:
> > >
> > > I have a flink application in which some of the operators have uid and
> name and some stateless ones don't.
> > >
> > > I've taken a save point and tried to start another instance of the
> application from a savepoint - I get the following exception which
> indicates that the operator is not available to the new program even though
> the second job is the same as first but just running from the first jobs
> savepoint.
> > >
> > > Caused by: java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint
> s3://mybucket/state/savePoint/mysavepointfolder/66s4c6402d7532801287290436fa9fadd/savepoint-664c64-fa235d26d379.
> Cannot map checkpoint/savepoint state for operator
> d1a56c5a9ce5e3f1b03e01cac458bb4f to the new program, because the operator
> is not available in the new program. If you want to allow to skip this, you
> can set the --allowNonRestoredState option on the CLI.
> > >
> > > at
> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
> > >
> > > at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1102)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1219)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1143)
> > >
> > > at
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:294)
> > >
> > > at
> org.apac

Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Chesnay Schepler
This looks to me like the TwoPhaseCommitSinkFunction is a bit too 
strict. The notification for complete checkpoints is not reliable; it 
may be late, not come at all, possibly even in different order than 
expected.


As such, if you a simple case of snapshot -> snapshot -> notify -> 
notify the sink will always fail with an exception.


What it should do imo is either a) don't check that there is a pending 
transaction or b) track the highest checkpoint id received and 
optionally don't fail if the notification is for an older CP.


@piotr WDYT?

On 27/11/2019 08:59, Tony Wei wrote:

Hi,

As the follow up, it seem that savepoint can't be subsumed, so that 
its notification could still be send to each TMs.

Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?

Best,
Tony Wei

Tony Wei mailto:tony19920...@gmail.com>> 於 
2019年11月27日 週三 下午3:43寫道:


Hi,

I want to raise this question again, since I have had this
exception on my production job.

The exception is as follows

2019-11-27 14:47:29

java.lang.RuntimeException: Error while confirming checkpoint
at
org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.IllegalStateException: checkpoint completed, but no
transaction pending at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at

org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:267)
at

org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:822)
at
org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
... 5 more


And these are the checkpoint / savepoint before the job failed.
checkoint.png

It seems that checkpoint # 675's notification handled the
savepoint # 674's pending transaction holder, but savepoint #674's
notification didn't be subsumed or be ignored by JM.
Therefore, during the checkpoint #676, some tasks got notification
before getting the checkpoint barrier and led to this exception
happened, because there was no pending transaction in queue.

Does anyone know the details about subsumed notifications
mechanism and how checkpoint coordinator handle this situation?
Please correct me if I'm wrong. Thanks.

Best,
Tony Wei

Stefan Richter mailto:s.rich...@data-artisans.com>> 於 2018年10月8日 週一
下午5:03寫道:

Hi Pedro,

unfortunately the interesting parts are all removed from the
log, we already know about the exception itself. In
particular, what I would like to see is what checkpoints have
been triggered and completed before the exception happens.

Best,
Stefan

> Am 08.10.2018 um 10:23 schrieb PedroMrChaves
mailto:pedro.mr.cha...@gmail.com>>:
>
> Hello,
>
> Find attached the jobmanager.log. I've omitted the log lines
from other
> runs, only left the job manager info and the run with the
error.
>
> jobmanager.log
>



>
>
>
> Thanks again for your help.
>
> Regards,
> Pedro.
>
>
>
> -
> Best Regards,
> Pedro Chaves
> --
> Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: How to recover state from savepoint on embedded mode?

2019-11-27 Thread Congxian Qiu
Hi,

You can recovery from checkpoint/savepoint if JM&TM can read from the given
path. no math which mode the job is running on.

Best,
Congxian


Reo Lei  于2019年11月26日周二 下午12:18写道:

>
>
> -- Forwarded message -
> 发件人: Reo Lei 
> Date: 2019年11月26日周二 上午9:53
> Subject: Re: How to recover state from savepoint on embedded mode?
> To: Yun Tang 
>
>
> Hi Yun,
> Thanks for your reply. what I say the embedded mode is the whole flink
> cluster and job, include jobmanager, taskmanager and the job application
> itself, running within a local JVM progress, which is use the "
> LocalStreamEnvironment" within the job. And the start command look like
> this: "java -Xmx512M -XX:... -Dlog.file=... -cp flink-job.jar
> com.a.b.c.MyJob > /dev/null &"
>
> why I am not use the standalnoe mode to run the job is because the running
> env haven't zookeeper, and would not install the zookeeper. So I need to 
> depend
> on the embedded mode to run my job.
>
> BR,
> Reo
>
> Yun Tang  于2019年11月26日周二 上午2:38写道:
>
>> What is the embedded mode mean here? If you refer to SQL embedded mode,
>> you cannot resume from savepoint now; if you refer to local standalone
>> cluster, you could use `bin/flink run -s` to resume on a local cluster.
>>
>>
>>
>> Best
>>
>> Yun Tang
>>
>>
>>
>> *From: *Reo Lei 
>> *Date: *Tuesday, November 26, 2019 at 12:37 AM
>> *To: *"user@flink.apache.org" 
>> *Subject: *How to recover state from savepoint on embedded mode?
>>
>>
>>
>> Hi,
>>
>> I have a job need running on embedded mode, but need to init some rule
>> data from a database before start. So I used the State Processor API to
>> construct my state data and save it to the local disk. When I want to used
>> this savepoint to recover my job, I found resume a job from a savepoint
>> need to use the command `bin/flink run -s :savepointPath *[*:runArgs]`
>> to submit a job to flink cluster. That is mean the job is run on remote
>> mode, not embedded mode.
>>
>>
>>
>> And I was wondering why I can't resume a job from a savepoint on
>> embedded mode. If that is possible, what should I do?
>>
>> BTW, if we can not  resume a job from a savepoint on embedded mode, how
>> to know the savepoint is constructed correctly in develop environment and
>> use idea to debug it?
>>
>>
>>
>> BR,
>>
>> Reo
>>
>>
>>
>


Re: ArrayIndexOutOfBoundException on checkpoint creation

2019-11-27 Thread Congxian Qiu
Hi

Which version are you using now(if on some old version, could you please
try if this exception is till there on Flink 1.9),  on the other hand, did
you try RocksDBStateBackend for this?

Best,
Congxian


Theo Diefenthal  于2019年11月26日周二 下午6:52写道:

> Hi,
>
> We have a pipeline with a custom ProcessFunction and state (see [1],
> implemented as suggested by Fabian with a ValueState and
> ValueState>)
> The behavior of that function works fine in our unittests and with low
> load in our test environment (100.000 records per minute). On the
> production environment, we observe reproduceable crashes like the attached
> one.
> Any idea on why this out of bound could be caused? Every time we read the
> state and modify it, we are certain that an .update() was called:
>
> 2019-11-26T11:26:55+01:00 host19 java.lang.Exception: Could not materialize 
> checkpoint 7 for operator our_operator) (4/8).
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 2019-11-26T11:26:55+01:00 host19 at java.lang.Thread.run(Thread.java:745)
> 2019-11-26T11:26:55+01:00 host19 Caused by: 
> java.util.concurrent.ExecutionException: 
> java.lang.ArrayIndexOutOfBoundsException: 67108864
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
> 2019-11-26T11:26:55+01:00 host19 ... 3 more
> 2019-11-26T11:26:55+01:00 host19 Caused by: 
> java.lang.ArrayIndexOutOfBoundsException: 67108864
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:364)
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceResolver.java:47)
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
> 2019-11-26T11:26:55+01:00 host19 at 
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:116)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:121)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:37)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:191)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy$1.callInternal(HeapSnapshotStrategy.java:158)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> 2019-11-26T11:26:55+01:00 host19 at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2019-11-26T11:26:55+01:00 host19 at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
> 2019-11-26T11:26:55+01:00 host19 ... 5 more
> 2019-11-26T11:26:55+01:00 host18 WARN  org.apache.hadoop.hdfs.DataStreamer
>- DataStreamer Exception
> 2019-11-26T11:26:55+01:00 host18 java.io.FileNotFoundException: File does not 
> exist: 
> /.../STATE/CHECKPOINTS/0a2e111b3a800aae0d3b49f33e0db6f3/chk-7/3da2a0a4-f5ef-4e8c-bc1a-9fe892cb0b18
>  (inode 577546140) Holder DFSClient_NONMAPREDUCE_-1714419242_95 does not have 
> any open files.
> 2019-11-26T11:26:55+01:00 host18 at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2782)
> 2019-11-26T11:26:55+01:00 host18 at 
> org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.ja

Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

2019-11-27 Thread Congxian Qiu
Hi

Do you use UNION state in your scenario, when using UNION state, then JM
may encounter OOM because each TDD will contains all the state of all
subtasks[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
Best,
Congxian


Aaron Levin  于2019年11月27日周三 上午3:55写道:

> Hi,
>
> Some context: after a refactoring, we were unable to start our jobs.
> They started fine and checkpointed fine, but once the job restarted
> owing to a transient failure, the application was unable to start. The
> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
> the `_metadata` file we saw `- 1402496 offsets:
> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
> to be the operator state we were no longer initializing or
> snapshotting after the refactoring.
>
> Before I dig further into this and try to find a smaller reproducible
> test case I thought I would ask if someone knows what the expected
> behaviour is for the following scenario:
>
> suppose you have an operator (in this case a Source) which has some
> operator ListState. Suppose you run your flink job for some time and
> then later refactor your job such that you no longer use that state
> (so after the refactoring you're no longer initializing this operator
> state in initializeState, nor are you snapshotting the operator state
> in snapshotState). If you launch your new code from a recent
> savepoint, what do we expect to happen to the state? Do we anticipate
> the behaviour I explained above?
>
> My assumption would be that Flink would not read this state and so it
> would be removed from the next checkpoint or savepoint. Alternatively,
> I might assume it would not be read but would linger around every
> future checkpoint or savepoint. However, it feels like what is
> happening is it's not read and then possibly replicated by every
> instance of the task every time a checkpoint happens (hence the
> accidentally exponential behaviour).
>
> Thoughts?
>
> PS - in case someone asks: I was sure that we were calling `.clear()`
> appropriately in `snapshotState` (we, uh, already learned that lesson
> :D)
>
> Best,
>
> Aaron Levin
>


Re: Flink behavior as a slow consumer - out of Heap MEM

2019-11-27 Thread Robert Metzger
Hi Hanan,

Flink does handle backpressure gracefully. I guess your custom ZMQ source
is receiving events in a separate thread?
In a Flink source, the SourceContext.collect() method will not return if
the downstream operators are not able to process incoming data fast enough.

If my assumptions are right, I would suggest you to pull data from ZMQ in
small batches, forwarding them to .collect(), and pausing the fetch when
collect() is blocked.


On Tue, Nov 26, 2019 at 6:59 AM vino yang  wrote:

> Hi Hanan,
>
> Sometimes, the behavior depends on your implementation.
>
> Since it's not a built-in connector, it would be better to share your
> customized source with the community
> so that the community would be better to help you figure out where is the
> problem.
>
> WDYT?
>
> Best,
> Vino
>
> Hanan Yehudai  于2019年11月26日周二 下午12:27写道:
>
>> HI ,  I am trying to do some performance test to my flink deployment.
>>
>> I am implementing an extremely simplistic use case
>>
>> I built a ZMQ Source
>>
>>
>>
>> The topology is ZMQ Source - > Mapper- > DIscardingSInk ( a sink that
>> does nothing )
>>
>>
>>
>> Data is pushed via ZMQ at a very high rate.
>>
>> When the incoming  rate from ZMQ is higher then the rate flink can keep
>> up with,  I can see that the JVM Heap is filling up  ( using Flinks metrics
>> ) .
>> when the heap is fullt consumes – TM chokes , a HeartBeat is missed  and
>> the job fails.
>>
>>
>>
>> I was expecting Flink to handle this type of backpressure gracefully and
>> not
>>
>>
>>
>> Note :  The mapper has not state to persist
>>
>> See below the Grafana charts,  on the left  is the TM HHEAP  Used.
>>
>> On the right – the ZMQ – out of flink. ( yellow ) Vs Flink consuming rate
>> from reported by ZMQSOurce
>>
>> 1GB is the configured heap size
>>
>>
>>
>>


Re: Some doubts about window start time and end time

2019-11-27 Thread Jun Zhang
Hi,Caizhi :
1.if I add offset , 
window(TumblingProcessingTimeWindows.of(Time.hours(6),Time.hours(-8)))
   it wil get a error: TumblingProcessingTimeWindows parameters must 
satisfy abs(offset) < size 
2.If it is caused by do not adding an offset, then why the same code, I set the 
window size to be an hour and there is no problem, and set the window size to 
six hours will be a problem?
   
  

 
 
 
 On 11/27/2019 18:21??Caizhi Weng

Re: Some doubts about window start time and end time

2019-11-27 Thread Caizhi Weng
Hi Jun,

You have to specify an offset when defining the windows. According to the
Java docs of TumblingProcessingTimeWindows: "*if you are living in
somewhere which is not using UTC±00:00 time*,* such as China which is using
UTC+08:00*,*and you want a time window with size of one day*,* and window
begins at every 00:00:00 of local time*,*you may use {**@code **of*(
*Time.days*(*1*),*Time.hours*(*-8*))*}. The parameter of offset is {**@code
**Time.hours*(*-8*))*} since UTC+08:00 is 8 hours earlier than UTC time.*"

Does this solve the problem?

Jun Zhang <825875...@qq.com> 于2019年11月27日周三 下午6:03写道:

>
>
> Hi,Caizhi :
>
> the code like this :
>
>
> dataStream
> .keyBy(“device")
> .window(TumblingProcessingTimeWindows.of(Time.hours(6)))
> .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
> .aggregate(new MyAggre(), new
> WindowResultFunction())
>  .print();
>
> I add a trigger for quick output
>
>
> On 11/27/2019 17:54,Caizhi Weng
>  wrote:
>
> Hi Jun,
>
> How do you define your window? Could you please show us the code?
>
> Thanks.
>
> Jun Zhang <825875...@qq.com> 于2019年11月27日周三 下午5:22写道:
>
>> ,
>> Hi:
>> I defined a Tumbling window, I set the time size to one hour, and the
>> resulting windows are [00: 00: 00-01: 00: 00], [01: 00: 00-02: 00: 00]. 
>> This meets my expectations, but when I set the time size to 6 hours, the
>> resulting window size is [02: 00: 00-08: 00: 00], [08: 00: 00-14: 00: 00],
>> [14: 00: 00-20: 00: 00] ...
>> But my expected window size is [00: 00: 00-06: 00: 00], [06: 00: 00-12:
>> 00: 00] ...
>> Is it right to get such a window result?
>>
>> thanks
>>
>>
>>


Re: Some doubts about window start time and end time

2019-11-27 Thread Jun Zhang
Hi??Caizhi ??


the code like this :




dataStream
.keyBy(??device")
.window(TumblingProcessingTimeWindows.of(Time.hours(6)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))

   
 .aggregate(new MyAggre(), new WindowResultFunction())   
                      
           .print();



I add a trigger for quick output




On 11/27/2019 17:54??Caizhi Weng

Re: Some doubts about window start time and end time

2019-11-27 Thread Caizhi Weng
Hi Jun,

How do you define your window? Could you please show us the code?

Thanks.

Jun Zhang <825875...@qq.com> 于2019年11月27日周三 下午5:22写道:

> ,
> Hi:
> I defined a Tumbling window, I set the time size to one hour, and the
> resulting windows are [00: 00: 00-01: 00: 00], [01: 00: 00-02: 00: 00]. 
> This meets my expectations, but when I set the time size to 6 hours, the
> resulting window size is [02: 00: 00-08: 00: 00], [08: 00: 00-14: 00: 00],
> [14: 00: 00-20: 00: 00] ...
> But my expected window size is [00: 00: 00-06: 00: 00], [06: 00: 00-12:
> 00: 00] ...
> Is it right to get such a window result?
>
> thanks
>
>
>


Some doubts about window start time and end time

2019-11-27 Thread Jun Zhang
,  
Hi:
I defined a Tumbling window, I set the time size to one hour, and the resulting 
windows are [00: 00: 00-01: 00: 00], [01: 00: 00-02: 00: 00]. 
This meets my expectations, but when I set the time size to 6 hours, the 
resulting window size is [02: 00: 00-08: 00: 00], [08: 00: 00-14: 00: 00], [14: 
00: 00-20: 00: 00] ...
But my expected window size is [00: 00: 00-06: 00: 00], [06: 00: 00-12: 00: 00] 
...
Is it right to get such a window result?



thanks

Re: ***UNCHECKED*** Error while confirming Checkpoint

2019-11-27 Thread Tony Wei
Hi,

As the follow up, it seem that savepoint can't be subsumed, so that its
notification could still be send to each TMs.
Is this a bug that need to be fixed in TwoPhaseCommitSinkFunction?

Best,
Tony Wei

Tony Wei  於 2019年11月27日 週三 下午3:43寫道:

> Hi,
>
> I want to raise this question again, since I have had this exception on my
> production job.
>
> The exception is as follows
>
>
>> 2019-11-27 14:47:29
>
>
>
> java.lang.RuntimeException: Error while confirming checkpoint
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1205)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>> .java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: checkpoint completed, but no
>> transaction pending
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:
>> 195)
>> at org.apache.flink.streaming.api.functions.sink.
>> TwoPhaseCommitSinkFunction.notifyCheckpointComplete(
>> TwoPhaseCommitSinkFunction.java:267)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .notifyCheckpointComplete(StreamTask.java:822)
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1200)
>> ... 5 more
>
>
> And these are the checkpoint / savepoint before the job failed.
> [image: checkoint.png]
>
> It seems that checkpoint # 675's notification handled the savepoint #
> 674's pending transaction holder, but savepoint #674's notification didn't
> be subsumed or be ignored by JM.
> Therefore, during the checkpoint #676, some tasks got notification before
> getting the checkpoint barrier and led to this exception happened, because
> there was no pending transaction in queue.
>
> Does anyone know the details about subsumed notifications mechanism and
> how checkpoint coordinator handle this situation? Please correct me if I'm
> wrong. Thanks.
>
> Best,
> Tony Wei
>
> Stefan Richter  於 2018年10月8日 週一 下午5:03寫道:
>
>> Hi Pedro,
>>
>> unfortunately the interesting parts are all removed from the log, we
>> already know about the exception itself. In particular, what I would like
>> to see is what checkpoints have been triggered and completed before the
>> exception happens.
>>
>> Best,
>> Stefan
>>
>> > Am 08.10.2018 um 10:23 schrieb PedroMrChaves > >:
>> >
>> > Hello,
>> >
>> > Find attached the jobmanager.log. I've omitted the log lines from other
>> > runs, only left the job manager info and the run with the error.
>> >
>> > jobmanager.log
>> > <
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t612/jobmanager.log>
>>
>> >
>> >
>> >
>> > Thanks again for your help.
>> >
>> > Regards,
>> > Pedro.
>> >
>> >
>> >
>> > -
>> > Best Regards,
>> > Pedro Chaves
>> > --
>> > Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>