Re: high availability with automated disaster recovery using zookeeper

2018-07-10 Thread Till Rohrmann
Hi Tovi,

that is an interesting use case you are describing here. I think, however,
it depends mainly on the capabilities of ZooKeeper to produce the intended
behavior. Flink itself relies on ZooKeeper for leader election in HA mode
but does not expose any means to influence the leader election process. To
be more precise ZK is used as a blackbox which simply tells a JobManager
that it is now the leader, independent of any data center preferences. I'm
not sure whether it is possible to tell ZooKeeper about these preferences.
If not, then an alternative could be to implement one's own high
availability services which does that at the moment.

Cheers,
Till

On Mon, Jul 9, 2018 at 1:48 PM Sofer, Tovi  wrote:

> Hi all,
>
>
>
> We are now examining how to achieve high availability for Flink, and to
> support also automatic recovery in disaster scenario- when all DC goes down.
>
> We have DC1 which we usually want work to be done, and DC2 – which is more
> remote and we want work to go there only when DC1 is down.
>
>
>
> We examined few options and would be glad to hear feedback a suggestion
> for another way to achieve this.
>
> · Two zookeeper separate zookeeper and flink clusters on the two
> data centers.
>
> Only the cluster on DC1 are running, and state is copied to DC2 in offline
> process.
>
> To achieve automatic recovery we need to use some king of watch dog which
> will check DC1 availability , and if it is down will start DC2 (and same
> later if DC2 is down).
>
> Is there recommended tool for this?
>
> · Zookeeper “stretch cluster” cross data centers – with 2 nodes
> on DC1, 2 nodes on DC2 and one observer node.
>
> Also flink cluster jobmabnager1 on DC1 and jobmanager2 on DC2.
>
> This way when DC1 is down, zookeeper will notice this automatically and
> will transfer work to jobmanager2 on DC2.
>
> However we would like zookeeper leader, and flink jobmanager leader
> (primary one) to be from DC1 – unless it is down.
>
> Is there a way to achieve this?
>
>
>
> Thanks and regards,
>
> [image: citi_logo_mail]
>
> *Tovi Sofer*
>
> Software Engineer
> +972 (3) 7405756
>
> [image: Mail_signature_blue]
>
>
>


Re: Checkpointing in Flink 1.5.0

2018-07-10 Thread Sampath Bhat
Chesnay - Why is the absolute file check required in the
RocksDBStateBackend.setDbStoragePaths(String ... paths). I think this is
causing the issue. Its not related to GlusterFS or file system. The same
problem can be reproduced with the following configuration on local
machine. The flink application should support checkpointing. We get the
IllegealArgumentexecption (Relative File paths not allowed)

state.backend: rocksdb
state.checkpoints.dir: file:///home/demo/checkpoints/ext_checkpoints
state.savepoints.dir: file:///home/demo/checkpoints/checkpoints/savepoints
state.backend.fs.checkpointdir:
file:///home/demo/checkpoints/checkpoints/fs_state
#state.backend.rocksdb.checkpointdir:
file:///home/demo/checkpoints/checkpoints/rocksdb_state
state.backend.rocksdb.localdir:
/home/demo/checkpoints/checkpoints/rocksdb_state

Any insights would be helpful.

On Wed, Jul 4, 2018 at 2:27 PM, Chesnay Schepler  wrote:

> Reference: https://issues.apache.org/jira/browse/FLINK-9739
>
>
> On 04.07.2018 10:46, Chesnay Schepler wrote:
>
> It's not really path-parsing logic, but path handling i suppose; see
> RocksDBStateBackend#setDbStoragePaths().
>
> I went ahead and converted said method into a simple test method, maybe
> this is enough to debug the issue.
>
> I assume this regression was caused by FLINK-6557, which refactored the
> state backend to rely on java Files instead of Flink paths.
> I'll open a JIRA to document it.
>
> The deprecation notice is not a problem.
>
> public static void testPaths(String... paths) {
>if (paths.length == 0) {
>   throw new IllegalArgumentException("empty paths");   }
>else {
>   File[] pp = new File[paths.length];  for (int i = 0; i < 
> paths.length; i++) {
>  final String rawPath = paths[i]; final String path; 
> if (rawPath == null) {
> throw new IllegalArgumentException("null path"); }
>  else {
> // we need this for backwards compatibility, to allow URIs like 
> 'file:///'...URI uri = null;try {
>uri = new Path(rawPath).toUri();}
> catch (Exception e) {
>// cannot parse as a path}
>
> if (uri != null && uri.getScheme() != null) {
>if ("file".equalsIgnoreCase(uri.getScheme())) {
>   path = uri.getPath();   }
>else {
>   throw new IllegalArgumentException("Path " + rawPath + " 
> has a non-local scheme");   }
> }
> else {
>path = rawPath;}
>  }
>
>  pp[i] = new File(path); if (!pp[i].isAbsolute()) { // my 
> suspicion is that this categorically fails for GlusterFS paths
> throw new IllegalArgumentException("Relative paths are not 
> supported"); }
>   }
>}
> }
>
>
>
> On 03.07.2018 16:35, Jash, Shaswata (Nokia - IN/Bangalore) wrote:
>
> Hello Chesnay,
>
>
>
> Cluster (in kubernetes)-wide checkpointing directory using glusterfs
> volume mount (thus file access protocol file:///) was working fine till
> 1.4.2 for us. So we like to understand where the breakage happened in
> 1.5.0.
>
> Can you please mention me the relevant source code files related to
> rocksdb “custom file path” parsing logic? We would be interested to
> investigate this.
>
>
>
> I also observed below in the log –
>
>
>
> Config uses deprecated configuration key 
> 'state.backend.rocksdb.checkpointdir' instead of proper key 
> 'state.backend.rocksdb.localdir'
>
> Regards,
>
> Shaswata
>
>
>
> *From:* Chesnay Schepler [mailto:ches...@apache.org ]
> *Sent:* Tuesday, July 03, 2018 5:52 PM
> *To:* Data Engineer  
> *Cc:* user@flink.apache.org
> *Subject:* Re: Checkpointing in Flink 1.5.0
>
>
>
> The code appears to be working fine.
>
> This may happen because you're using a GlusterFS volume.
> The RocksDBStateBackend uses java Files internally (NOT nio Paths), which
> AFAIK only work properly against the plain local file-system.
>
> The GlusterFS nio FIleSystem implementation also explicitly does not
> support conversions to File
> 
> .
>
> On 03.07.2018 13:53, Chesnay Schepler wrote:
>
> Thanks. Looks like RocksDBStateBackend.setDbStoragePaths has some custom
> file path parsing logic, will probe it a bit to see what the issue is.
>
> On 03.07.2018 13:45, Data Engineer wrote:
>
> 2018-07-03 11:30:35,703 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
>
> 2018-07-03 11:30:35,705 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Starting 
> StandaloneSessionClusterEntrypoint (Version: , Rev:c61b108, 
> Date:24.05.2018 @ 16:54:44 CEST)
>
> 2018-07-03 11:30:35,705 INFO  

Re: 1.5 some thing weird

2018-07-10 Thread Till Rohrmann
Hi Vishal,

it looks as if the flushing of the checkpoint data to HDFS failed due to
some expired lease on the checkpoint file. Therefore, Flink aborted the
checkpoint `chk-125` and removed it. This is the normal behaviour if Flink
cannot complete a checkpoint. As you can see, afterwards, the checkpoints
are again successful.

Cheers,
Till

On Mon, Jul 9, 2018 at 7:15 PM Vishal Santoshi 
wrote:

> drwxr-xr-x   - root hadoop  0 2018-07-09 12:33
> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-123
> drwxr-xr-x   - root hadoop  0 2018-07-09 12:35
> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-124
> drwxr-xr-x   - root hadoop  0 2018-07-09 12:51
> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-126
> drwxr-xr-x   - root hadoop  0 2018-07-09 12:53
> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-127
> drwxr-xr-x   - root hadoop  0 2018-07-09 12:55
> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-128
>
> See the missing chk-125
>
> So I see the above checkpoints for a job. at the  2018-07-09, 12:38:43   this
> exception was thrown
>
>
> the  chk-125 is missing from hdfs and the job complains about it
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease on
> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e
> (inode 1987098987): File does not exist. Holder
> DFSClient_NONMAPREDUCE_1527557459_11240 does not have any open files.
>
> At about the same time
>
> ID: 125Failure Time: 12:38:23Cause: Checkpoint expired before completing..
>
>
> Is this some race condition. A checkpoint had to be taken and , that was
> was chk-125, it took longer than the configure time ( 1 minute ).  It
> aborted the pipe. Should it have ? It actually did not even create the chk-125
> but then refers to it and aborts the pipe.
>
>
>
>
>
>
>
>
> This is the full exception.
>
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 125 for operator 360 minute interval -> 360 minutes to TimeSeries.Entry.2 
> (5/6).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1154)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:948)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:885)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 125 for 
> operator 360 minute interval -> 360 minutes to TimeSeries.Entry.2 (5/6).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://nn-crunchy.bf2.tumblr.net:8020/flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
>   ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://nn-crunchy.bf2.tumblr.net:8020/flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:325)
>   at 
> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:705)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:641)
>   at 
> org.apache.flink.runtime.io.async.Abstra

Re: REST API time out ( flink 1.5 ) on SP

2018-07-10 Thread Till Rohrmann
Hi Vishal,

you need to give us a little bit more context in order to understand your
question.

Cheers,
Till

On Mon, Jul 9, 2018 at 10:36 PM Vishal Santoshi 
wrote:

> java.util.concurrent.CompletionException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_10#-862909719]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>   at .
>
>
> which exact configuration is this..
>


Access the data in a stream after writing to a sink

2018-07-10 Thread Teena Kappen // BPRISE
Hi,

Is it possible to access the data in a stream that was written to a sink? I 
have a Cassandra Sink in my stream job and I have to access all the records 
that were written to the Cassandra sink and write it to another sink. Is there 
any way to do that?

Regards,
Teena


RE: Access the data in a stream after writing to a sink

2018-07-10 Thread Teena Kappen // BPRISE
Adding to the previous question, is it possible to check if each record in a 
stream was written without any exceptions to a Cassandra Sink? I have to write 
the records to the next sink only if the first write is successful. So, 
replicating the streams before the write is not an option.

From: Teena Kappen // BPRISE 
Sent: 10 July 2018 12:50
To: user@flink.apache.org
Subject: Access the data in a stream after writing to a sink

Hi,

Is it possible to access the data in a stream that was written to a sink? I 
have a Cassandra Sink in my stream job and I have to access all the records 
that were written to the Cassandra sink and write it to another sink. Is there 
any way to do that?

Regards,
Teena


Re: 1.5 some thing weird

2018-07-10 Thread Vishal Santoshi
That makes sense, what does not make sense is that the pipeline restarted.
I would have imagined that an aborted chk point would not abort the
pipeline.

On Tue, Jul 10, 2018 at 3:16 AM, Till Rohrmann  wrote:

> Hi Vishal,
>
> it looks as if the flushing of the checkpoint data to HDFS failed due to
> some expired lease on the checkpoint file. Therefore, Flink aborted the
> checkpoint `chk-125` and removed it. This is the normal behaviour if Flink
> cannot complete a checkpoint. As you can see, afterwards, the checkpoints
> are again successful.
>
> Cheers,
> Till
>
> On Mon, Jul 9, 2018 at 7:15 PM Vishal Santoshi 
> wrote:
>
>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:33 /flink/kpi_unique/
>> 392d0436e53f3ef5e494ba3cc63428bf/chk-123
>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:35 /flink/kpi_unique/
>> 392d0436e53f3ef5e494ba3cc63428bf/chk-124
>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:51 /flink/kpi_unique/
>> 392d0436e53f3ef5e494ba3cc63428bf/chk-126
>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:53 /flink/kpi_unique/
>> 392d0436e53f3ef5e494ba3cc63428bf/chk-127
>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:55 /flink/kpi_unique/
>> 392d0436e53f3ef5e494ba3cc63428bf/chk-128
>>
>> See the missing chk-125
>>
>> So I see the above checkpoints for a job. at the  2018-07-09, 12:38:43   this
>> exception was thrown
>>
>>
>> the  chk-125 is missing from hdfs and the job complains about it
>> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.
>> hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on
>> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428
>> bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e (inode 1987098987): File
>> does not exist. Holder DFSClient_NONMAPREDUCE_1527557459_11240 does not
>> have any open files.
>>
>> At about the same time
>>
>> ID: 125Failure Time: 12:38:23Cause: Checkpoint expired before
>> completing..
>>
>>
>> Is this some race condition. A checkpoint had to be taken and , that was
>> was chk-125, it took longer than the configure time ( 1 minute ).  It
>> aborted the pipe. Should it have ? It actually did not even create the 
>> chk-125
>> but then refers to it and aborts the pipe.
>>
>>
>>
>>
>>
>>
>>
>>
>> This is the full exception.
>>
>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
>> 125 for operator 360 minute interval -> 360 minutes to TimeSeries.Entry.2 
>> (5/6).}
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1154)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:948)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:885)
>>  at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 125 for 
>> operator 360 minute interval -> 360 minutes to TimeSeries.Entry.2 (5/6).
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
>>  ... 6 more
>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
>> Could not flush and close the file system output stream to 
>> hdfs://nn-crunchy.bf2.tumblr.net:8020/flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e
>>  in order to obtain the stream state handle
>>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>  at 
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>>  at 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
>>  ... 5 more
>> Caused by: java.io.IOException: Could not flush and close the file system 
>> output stream to 
>> hdfs://nn-crunchy.bf2.tumblr.net:8020/flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e
>>  in order to obtain the stream state handle
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:325)
>>  at 
>> org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
>>  at 
>> org.apache

Re: REST API time out ( flink 1.5 ) on SP

2018-07-10 Thread Vishal Santoshi
Aah sorry, while taking a save point without cancel, we hit this timeout  (
appears to be 10 seconds ).   The save point does succeed, in this case it
takes roughly 13-15 seconds. Wanted to know which configuration to change
to increase the time out on the REST call.  It does not seem to be
aka.client.timeout which is set to 120 s  which was the case in 1.4 and I
could not  figure out from
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html
which setting to work with.

Thanks much.



On Tue, Jul 10, 2018 at 3:18 AM, Till Rohrmann  wrote:

> Hi Vishal,
>
> you need to give us a little bit more context in order to understand your
> question.
>
> Cheers,
> Till
>
> On Mon, Jul 9, 2018 at 10:36 PM Vishal Santoshi 
> wrote:
>
>> java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
>> Ask timed out on [Actor[akka://flink/user/jobmanager_10#-862909719]]
>> after [1 ms]. Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>   at .
>>
>>
>> which exact configuration is this..
>>
>


Want to write Kafka Sink to SQL Client by Flink-1.5

2018-07-10 Thread Shivam Sharma
Hi All,

We want to write Kafka Sink functionality for Flink(1.5) SQL Client. We
have read the code and chalk out a rough plan for implementation.

Any guidance for this implementation will be very helpful.

Thanks
-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
*


Support for detached mode for Flink1.5 SQL Client

2018-07-10 Thread Shivam Sharma
Hi All,

Is there any way to run Flink1.5 sql-client queries in detached mode?
Actually, we need to run multiple queries for different use cases and
sql-client shell will open by the user on-demand.

-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
*


State sharing across trigger and evictor

2018-07-10 Thread Jayant Ameta
Hi,
I'm using the GlobalWindow with a custom CountTrigger (similar to the
CountTrigger provided by flink).
I'm also using an evictor to remove some of the elements from the window.
Is it possible to update the count when an element is evicted? For example:
can I access the ReducingState used by the countTrigger from my Evictor
class, so that the count always reflects the number of elements in the
windows?

Jayant Ameta


Re: 1.5 some thing weird

2018-07-10 Thread Till Rohrmann
Whether a Flink task should fail in case of a checkpoint error or not can
be configured via the CheckpointConfig which you can access via the
StreamExecutionEnvironment. You have to call
`CheckpointConfig#setFailOnCheckpointingErrors(false)` to deactivate the
default behaviour where the task always fails in case of a checkpoint error.

Cheers,
Till

On Tue, Jul 10, 2018 at 10:50 AM Vishal Santoshi 
wrote:

> That makes sense, what does not make sense is that the pipeline restarted.
> I would have imagined that an aborted chk point would not abort the
> pipeline.
>
> On Tue, Jul 10, 2018 at 3:16 AM, Till Rohrmann 
> wrote:
>
>> Hi Vishal,
>>
>> it looks as if the flushing of the checkpoint data to HDFS failed due to
>> some expired lease on the checkpoint file. Therefore, Flink aborted the
>> checkpoint `chk-125` and removed it. This is the normal behaviour if Flink
>> cannot complete a checkpoint. As you can see, afterwards, the checkpoints
>> are again successful.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jul 9, 2018 at 7:15 PM Vishal Santoshi 
>> wrote:
>>
>>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:33
>>> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-123
>>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:35
>>> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-124
>>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:51
>>> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-126
>>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:53
>>> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-127
>>> drwxr-xr-x   - root hadoop  0 2018-07-09 12:55
>>> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-128
>>>
>>> See the missing chk-125
>>>
>>> So I see the above checkpoints for a job. at the  2018-07-09, 12:38:43
>>>  this exception was thrown
>>>
>>>
>>> the  chk-125 is missing from hdfs and the job complains about it
>>> Caused by:
>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>>> No lease on
>>> /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e
>>> (inode 1987098987): File does not exist. Holder
>>> DFSClient_NONMAPREDUCE_1527557459_11240 does not have any open files.
>>>
>>> At about the same time
>>>
>>> ID: 125Failure Time: 12:38:23Cause: Checkpoint expired before
>>> completing..
>>>
>>>
>>> Is this some race condition. A checkpoint had to be taken and , that was
>>> was chk-125, it took longer than the configure time ( 1 minute ).  It
>>> aborted the pipe. Should it have ? It actually did not even create the 
>>> chk-125
>>> but then refers to it and aborts the pipe.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> This is the full exception.
>>>
>>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
>>> 125 for operator 360 minute interval -> 360 minutes to TimeSeries.Entry.2 
>>> (5/6).}
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1154)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:948)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:885)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 125 for 
>>> operator 360 minute interval -> 360 minutes to TimeSeries.Entry.2 (5/6).
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
>>> ... 6 more
>>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
>>> Could not flush and close the file system output stream to 
>>> hdfs://nn-crunchy.bf2.tumblr.net:8020/flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e
>>>  in order to obtain the stream state handle
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at 
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>>> at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
>>> ... 5 more
>>> Caused by: java.io.IOException: Could not flush and close the file system 
>>> output stream to 
>>> hdfs://nn-crunchy.bf2.tumblr.net:8020/flink/kpi_unique/392

Re: REST API time out ( flink 1.5 ) on SP

2018-07-10 Thread Till Rohrmann
In order to configure the timeouts for the REST handlers, please use
`web.timeout`. For the client timeout use `akka.client.timeout`.

Cheers,
Till

On Tue, Jul 10, 2018 at 10:54 AM Vishal Santoshi 
wrote:

> Aah sorry, while taking a save point without cancel, we hit this timeout
> ( appears to be 10 seconds ).   The save point does succeed, in this case
> it takes roughly 13-15 seconds. Wanted to know which configuration to
> change to increase the time out on the REST call.  It does not seem to be
> aka.client.timeout which is set to 120 s  which was the case in 1.4 and I
> could not  figure out from
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html
> which setting to work with.
>
> Thanks much.
>
>
>
> On Tue, Jul 10, 2018 at 3:18 AM, Till Rohrmann 
> wrote:
>
>> Hi Vishal,
>>
>> you need to give us a little bit more context in order to understand your
>> question.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jul 9, 2018 at 10:36 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> java.util.concurrent.CompletionException:
>>> akka.pattern.AskTimeoutException: Ask timed out on
>>> [Actor[akka://flink/user/jobmanager_10#-862909719]] after [1 ms].
>>> Sender[null] sent message of type
>>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>>   at .
>>>
>>>
>>> which exact configuration is this..
>>>
>>
>


Is Flink using even-odd versioning system

2018-07-10 Thread Alexander Smirnov
to denote development and stable releases?


Re: Confusions About JDBCOutputFormat

2018-07-10 Thread Hequn Cheng
Hi wangsan,

I agree with you. It would be kind of you to open a jira to check the
problem.

For the first problem, I think we need to establish connection each time
execute batch write. And, it is better to get the connection from a
connection pool.
For the second problem, to avoid multithread problem, I think we should
synchronized the batch object in flush() method.

What do you think?

Best, Hequn



On Tue, Jul 10, 2018 at 2:36 PM, wangsan  wrote:

> Hi all,
>
> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink
> application. But I am confused with the implementation of JDBCOutputFormat.
>
> 1. The Connection was established when JDBCOutputFormat is opened, and
> will be used all the time. But if this connction lies idle for a long time,
> the database will force close the connetion, thus errors may occur.
> 2. The flush() method is called when batchCount exceeds the threshold, but
> it is also called while snapshotting state. So two threads may modify
> upload and batchCount, but without synchronization.
>
> Please correct me if I am wrong.
>
> ——
> wangsan
>


Re: Access the data in a stream after writing to a sink

2018-07-10 Thread Hequn Cheng
Hi Teena,

It seems that a sink can not output data into another sink. Maybe we can
implement a combined user defined sink. In the combined sink, only write to
the next sink if the first write is successful.

On Tue, Jul 10, 2018 at 3:23 PM, Teena Kappen // BPRISE <
teena.kap...@bprise.com> wrote:

> Adding to the previous question, is it possible to check if each record in
> a stream was written without any exceptions to a Cassandra Sink? I have to
> write the records to the next sink only if the first write is successful.
> So, replicating the streams before the write is not an option.
>
>
>
> *From:* Teena Kappen // BPRISE 
> *Sent:* 10 July 2018 12:50
> *To:* user@flink.apache.org
> *Subject:* Access the data in a stream after writing to a sink
>
>
>
> Hi,
>
>
>
> Is it possible to access the data in a stream that was written to a sink?
> I have a Cassandra Sink in my stream job and I have to access all the
> records that were written to the Cassandra sink and write it to another
> sink. Is there any way to do that?
>
>
>
> Regards,
>
> Teena
>


Re: 1.5 some thing weird

2018-07-10 Thread Vishal Santoshi
 Will try the setting out.  Do not want to push it, but the exception can
be much more descriptive :)

Thanks much

On Tue, Jul 10, 2018 at 7:48 AM, Till Rohrmann  wrote:

> Whether a Flink task should fail in case of a checkpoint error or not can
> be configured via the CheckpointConfig which you can access via the
> StreamExecutionEnvironment. You have to call `CheckpointConfig#
> setFailOnCheckpointingErrors(false)` to deactivate the default behaviour
> where the task always fails in case of a checkpoint error.
>
> Cheers,
> Till
>
> On Tue, Jul 10, 2018 at 10:50 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> That makes sense, what does not make sense is that the pipeline
>> restarted. I would have imagined that an aborted chk point would not abort
>> the pipeline.
>>
>> On Tue, Jul 10, 2018 at 3:16 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> it looks as if the flushing of the checkpoint data to HDFS failed due to
>>> some expired lease on the checkpoint file. Therefore, Flink aborted the
>>> checkpoint `chk-125` and removed it. This is the normal behaviour if Flink
>>> cannot complete a checkpoint. As you can see, afterwards, the checkpoints
>>> are again successful.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jul 9, 2018 at 7:15 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 drwxr-xr-x   - root hadoop  0 2018-07-09 12:33
 /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-123
 drwxr-xr-x   - root hadoop  0 2018-07-09 12:35
 /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-124
 drwxr-xr-x   - root hadoop  0 2018-07-09 12:51
 /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-126
 drwxr-xr-x   - root hadoop  0 2018-07-09 12:53
 /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-127
 drwxr-xr-x   - root hadoop  0 2018-07-09 12:55
 /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-128

 See the missing chk-125

 So I see the above checkpoints for a job. at the  2018-07-09,
 12:38:43   this exception was thrown


 the  chk-125 is missing from hdfs and the job complains about it
 Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.
 hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on
 /flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428
 bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e (inode 1987098987):
 File does not exist. Holder DFSClient_NONMAPREDUCE_1527557459_11240
 does not have any open files.

 At about the same time

 ID: 125Failure Time: 12:38:23Cause: Checkpoint expired before
 completing..


 Is this some race condition. A checkpoint had to be taken and , that
 was was chk-125, it took longer than the configure time ( 1 minute ).  It
 aborted the pipe. Should it have ? It actually did not even create the 
 chk-125
 but then refers to it and aborts the pipe.








 This is the full exception.

 AsynchronousException{java.lang.Exception: Could not materialize 
 checkpoint 125 for operator 360 minute interval -> 360 minutes to 
 TimeSeries.Entry.2 (5/6).}
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1154)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:948)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:885)
at 
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.Exception: Could not materialize checkpoint 125 for 
 operator 360 minute interval -> 360 minutes to TimeSeries.Entry.2 (5/6).
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
... 6 more
 Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
 Could not flush and close the file system output stream to 
 hdfs://nn-crunchy.bf2.tumblr.net:8020/flink/kpi_unique/392d0436e53f3ef5e494ba3cc63428bf/chk-125/e9d6886c-e693-4827-97bc-dd3fd526b64e
  in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
 org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
 org.apache.flink.streaming.api.operators.OperatorSnapshotFinaliz

Re: REST API time out ( flink 1.5 ) on SP

2018-07-10 Thread Vishal Santoshi
That should do.

Thanks much.

On Tue, Jul 10, 2018 at 7:52 AM, Till Rohrmann  wrote:

> In order to configure the timeouts for the REST handlers, please use
> `web.timeout`. For the client timeout use `akka.client.timeout`.
>
> Cheers,
> Till
>
> On Tue, Jul 10, 2018 at 10:54 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Aah sorry, while taking a save point without cancel, we hit this timeout
>> ( appears to be 10 seconds ).   The save point does succeed, in this case
>> it takes roughly 13-15 seconds. Wanted to know which configuration to
>> change to increase the time out on the REST call.  It does not seem to be
>> aka.client.timeout which is set to 120 s  which was the case in 1.4 and I
>> could not  figure out from https://ci.apache.org/
>> projects/flink/flink-docs-release-1.5/ops/config.html which setting to
>> work with.
>>
>> Thanks much.
>>
>>
>>
>> On Tue, Jul 10, 2018 at 3:18 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> you need to give us a little bit more context in order to understand
>>> your question.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jul 9, 2018 at 10:36 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException:
 Ask timed out on [Actor[akka://flink/user/jobmanager_10#-862909719]]
 after [1 ms]. Sender[null] sent message of type
 "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
   at .


 which exact configuration is this..

>>>
>>


Re: REST API time out ( flink 1.5 ) on SP

2018-07-10 Thread Vishal Santoshi
As we on configurations let me take the liberty to ask this

Does akka.jvm-exit-on-fatal-error : true
have any relevance vis a vis quarantine ( it seems that we have our own
gossip protocol )  and if not what other places is this used for it to be
relevant ?

On Tue, Jul 10, 2018 at 10:19 AM, Vishal Santoshi  wrote:

> That should do.
>
> Thanks much.
>
> On Tue, Jul 10, 2018 at 7:52 AM, Till Rohrmann 
> wrote:
>
>> In order to configure the timeouts for the REST handlers, please use
>> `web.timeout`. For the client timeout use `akka.client.timeout`.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jul 10, 2018 at 10:54 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Aah sorry, while taking a save point without cancel, we hit this
>>> timeout  ( appears to be 10 seconds ).   The save point does succeed, in
>>> this case it takes roughly 13-15 seconds. Wanted to know which
>>> configuration to change to increase the time out on the REST call.  It does
>>> not seem to be  aka.client.timeout which is set to 120 s  which was the
>>> case in 1.4 and I could not  figure out from https://ci.apache.org/pro
>>> jects/flink/flink-docs-release-1.5/ops/config.html which setting to
>>> work with.
>>>
>>> Thanks much.
>>>
>>>
>>>
>>> On Tue, Jul 10, 2018 at 3:18 AM, Till Rohrmann 
>>> wrote:
>>>
 Hi Vishal,

 you need to give us a little bit more context in order to understand
 your question.

 Cheers,
 Till

 On Mon, Jul 9, 2018 at 10:36 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> java.util.concurrent.CompletionException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_10#-862909719]] after [1 ms].
> Sender[null] sent message of type "org.apache.flink.runtime.rpc.
> messages.LocalFencedMessage".
>   at .
>
>
> which exact configuration is this..
>

>>>
>


Yarn run single job

2018-07-10 Thread Garrett Barton
Greetings all,
 The docs say that I can skip creating a cluster and let the jobs create
their own clusters on yarn.  The example given is:

./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar


What I cannot figure out is what the -m option is meant for.  In my opinion
there is no jobmanager to specify, I expect flink to start one.  Skipping
the option doesn't work as it defaults to the conf one which has a comment
saying flink manages it for yarn deployments.

I tried pointing it at my yarn resource manager, it didn't like any of the
ports.

Any ideas?


Re: Yarn run single job

2018-07-10 Thread Chesnay Schepler

-m yarn-cluster switches the client into yarn mode.

yarn-cluster is not a placeholder or anything, you have to literally 
type that in.


On 10.07.2018 17:02, Garrett Barton wrote:

Greetings all,
 The docs say that I can skip creating a cluster and let the jobs 
create their own clusters on yarn.  The example given is:

|./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar|

What I cannot figure out is what the -m option is meant for.  In my 
opinion there is no jobmanager to specify, I expect flink to start 
one.  Skipping the option doesn't work as it defaults to the conf one 
which has a comment saying flink manages it for yarn deployments.


I tried pointing it at my yarn resource manager, it didn't like any of 
the ports.


Any ideas?





Re: Filter columns of a csv file with Flink

2018-07-10 Thread françois lacombe
Hi Hequn,

2018-07-10 3:47 GMT+02:00 Hequn Cheng :

> Maybe I misunderstand you. So you don't want to skip the whole file?
>
Yes I do
By skipping the whole file I mean "throw an Exception to stop the process
and inform user that file is invalid for a given reason" and not "the
process goes fully right and import 0 rows"


> If does, then "extending CsvTableSource and provide the avro schema to
> the constructor without creating a custom AvroInputFormat" is ok.
>

Then we agree on this
Is there any plan to give avro schemas a better role in Flink in further
versions?
Avro schemas are perfect to build CSVTableSource with code like

for (Schema field_nfo : sch.getTypes()){
 // Test if csv file header actually contains a field corresponding to
schema
 if (!csv_headers.contains(field_nfo.getName())) {
  throw new NoSuchFieldException(field_nfo.getName());
 }

 // Declare the field in the source Builder
 src_builder.field(field_nfo.getName(),
primitiveTypes.get(field_nfo.getType()));
}

All the best

François



> On Mon, Jul 9, 2018 at 11:03 PM, françois lacombe <
> francois.laco...@dcbrain.com> wrote:
>
>> Hi Hequn,
>>
>> 2018-07-09 15:09 GMT+02:00 Hequn Cheng :
>>
>>> The first step requires an AvroInputFormat because the source needs
>>> AvroInputFormat to read avro data if data match schema.
>>>
>>
>> I don't want avro data, I just want to check if my csv file have the same
>> fields than defined in a given avro schema.
>> Processing should stop if and only if I find missing columns.
>>
>> A record which not match the schema (types mainly) should be rejected and
>> logged in a dedicated file but the processing can go on.
>>
>> How about extending CsvTableSource and provide the avro schema to the
>> constructor without creating a custom AvroInputFormat?
>>
>>
>> François
>>
>
>


Re: Confusions About JDBCOutputFormat

2018-07-10 Thread wangsan
Hi Hequn,

Establishing a connection for each batch write may also have idle connection 
problem, since we are not sure when the connection will be closed. We call 
flush() method when a batch is finished or  snapshot state, but what if the 
snapshot is not enabled and the batch size not reached before the connection is 
closed?

May be we could use a Timer to test the connection periodically and keep it 
alive. What do you think?

I will open a jira and try to work on that issue.

Best, 
wangsan



> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
> 
> Hi wangsan,
> 
> I agree with you. It would be kind of you to open a jira to check the problem.
> 
> For the first problem, I think we need to establish connection each time 
> execute batch write. And, it is better to get the connection from a 
> connection pool.
> For the second problem, to avoid multithread problem, I think we should 
> synchronized the batch object in flush() method.
> 
> What do you think?
> 
> Best, Hequn
> 
> 
> 
> On Tue, Jul 10, 2018 at 2:36 PM, wangsan  > wrote:
> Hi all,
> 
> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink 
> application. But I am confused with the implementation of JDBCOutputFormat.
> 
> 1. The Connection was established when JDBCOutputFormat is opened, and will 
> be used all the time. But if this connction lies idle for a long time, the 
> database will force close the connetion, thus errors may occur.
> 2. The flush() method is called when batchCount exceeds the threshold, but it 
> is also called while snapshotting state. So two threads may modify upload and 
> batchCount, but without synchronization.
> 
> Please correct me if I am wrong.
> 
> ——
> wangsan
> 



Re: Yarn run single job

2018-07-10 Thread Garrett Barton
AHH it works!  Never occurred to me that it meant literally type in
yarn-cluster.

Thank you!

On Tue, Jul 10, 2018 at 11:17 AM Chesnay Schepler 
wrote:

> -m yarn-cluster switches the client into yarn mode.
>
> yarn-cluster is not a placeholder or anything, you have to literally type
> that in.
>
> On 10.07.2018 17:02, Garrett Barton wrote:
>
> Greetings all,
>  The docs say that I can skip creating a cluster and let the jobs create
> their own clusters on yarn.  The example given is:
>
> ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
>
>
> What I cannot figure out is what the -m option is meant for.  In my
> opinion there is no jobmanager to specify, I expect flink to start one.
> Skipping the option doesn't work as it defaults to the conf one which has a
> comment saying flink manages it for yarn deployments.
>
> I tried pointing it at my yarn resource manager, it didn't like any of the
> ports.
>
> Any ideas?
>
>
>


Re: How to trigger a function on the state periodically?

2018-07-10 Thread anna stax
sure. I will go ahead with this for now. Thanks for your suggestions.

On Mon, Jul 9, 2018 at 11:10 PM, Hequn Cheng  wrote:

> Hi,
> It depends on how many different users. In most cases, the performance
> will be fine. I think it worth to give a try. :-)
> Of course, there are ways to reduce the number of timers, for example
> keyBy(userId%1024), and use a MapState to store different users for the
> same group.
>
> On Tue, Jul 10, 2018 at 1:54 PM, anna stax  wrote:
>
>> Thanks Hequn. I think so too, the large number of timers could be a
>> problem.
>>
>> On Mon, Jul 9, 2018 at 10:23 PM, Hequn Cheng 
>> wrote:
>>
>>> Hi anna,
>>>
>>> According to your description, I think we can use the Timer to solve
>>> your problem. The TimerService deduplicates timers per key and timestamp.
>>> Also, note that a large number of timers can significantly increase
>>> checkpointing time.
>>>
>>> On Tue, Jul 10, 2018 at 11:38 AM, anna stax 
>>> wrote:
>>>
 Thanks Hequn, for the links.

 This is my use case..

 When there is no user activity for n weeks, I need to send a
 Notification to user.
 The activity stream is usually very high volume for most users.
 I thought it is not a good idea to use windowing for this, because of
 the stream volume and window size.
 I want to store in the state, for every user the last activity date and
 process them once daily.

 I want to make sure I am heading in the right direction. Thank you for
 your suggestions.

 -Anna

 On Mon, Jul 9, 2018 at 7:16 PM, Hequn Cheng 
 wrote:

> Hi anna,
>
> > I need to trigger a function once every day
> If you want to trigger by the function itself, you can use the
> Timer[1]. Both types of timers (processing-time and event-time) are
> internally maintained by the TimerService, and onTimer() method will be
> called once a timer fires.
> If you want to trigger the function of different
> parallelism synchronously, then the broadcast state[2] may be helpful.
>
> Hope this helps.
> Hequn
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/s
> tream/operators/process_function.html#timers
> [2] https://ci.apache.org/projects/flink/flink-docs-master/d
> ev/stream/state/broadcast_state.html
>
> On Tue, Jul 10, 2018 at 7:47 AM, anna stax 
> wrote:
>
>> Hi all,
>>
>> I need to trigger a function once every day to read the state and
>> create kafka events and also remove some records from state if they are 
>> too
>> old.
>>
>> Is there a way to do this? I am new to Flink, appreciate any feedback
>> and suggestions.
>>
>> Thanks
>> Anna
>>
>
>

>>>
>>
>


RE: high availability with automated disaster recovery using zookeeper

2018-07-10 Thread Sofer, Tovi
Hi Till, group,

Thank you for your response.
After reading further online on Mesos – Can’t Mesos fill the requirement of 
running job manager in primary server?
By using: “constraints”: [[“datacenter”, “CLUSTER”, “main”]]
(See 
http://www.stratio.com/blog/mesos-multi-data-center-architecture-for-disaster-recovery/
 )

Is this supported by Flink cluster on Mesos ?

Thanks again
Tovi

From: Till Rohrmann 
Sent: יום ג 10 יולי 2018 10:11
To: Sofer, Tovi [ICG-IT] 
Cc: user 
Subject: Re: high availability with automated disaster recovery using zookeeper

Hi Tovi,

that is an interesting use case you are describing here. I think, however, it 
depends mainly on the capabilities of ZooKeeper to produce the intended 
behavior. Flink itself relies on ZooKeeper for leader election in HA mode but 
does not expose any means to influence the leader election process. To be more 
precise ZK is used as a blackbox which simply tells a JobManager that it is now 
the leader, independent of any data center preferences. I'm not sure whether it 
is possible to tell ZooKeeper about these preferences. If not, then an 
alternative could be to implement one's own high availability services which 
does that at the moment.

Cheers,
Till

On Mon, Jul 9, 2018 at 1:48 PM Sofer, Tovi 
mailto:tovi.so...@citi.com>> wrote:
Hi all,

We are now examining how to achieve high availability for Flink, and to support 
also automatic recovery in disaster scenario- when all DC goes down.
We have DC1 which we usually want work to be done, and DC2 – which is more 
remote and we want work to go there only when DC1 is down.

We examined few options and would be glad to hear feedback a suggestion for 
another way to achieve this.

• Two zookeeper separate zookeeper and flink clusters on the two data 
centers.
Only the cluster on DC1 are running, and state is copied to DC2 in offline 
process.

To achieve automatic recovery we need to use some king of watch dog which will 
check DC1 availability , and if it is down will start DC2 (and same later if 
DC2 is down).

Is there recommended tool for this?

• Zookeeper “stretch cluster” cross data centers – with 2 nodes on DC1, 
2 nodes on DC2 and one observer node.

Also flink cluster jobmabnager1 on DC1 and jobmanager2 on DC2.

This way when DC1 is down, zookeeper will notice this automatically and will 
transfer work to jobmanager2 on DC2.

However we would like zookeeper leader, and flink jobmanager leader (primary 
one) to be from DC1 – unless it is down.

Is there a way to achieve this?

Thanks and regards,
[citi_logo_mail]
Tovi Sofer
Software Engineer
+972 (3) 7405756
[Mail_signature_blue]



How to find the relation between a running job and the original jar?

2018-07-10 Thread Lasse Nedergaard
Hi.

I working on a solution where I want to check if a running job use the
right jar in the right version.

Anyone knows if it is possible through the REST API to find information
about a running job that contains the jarid or something simillary so it is
possible to lookup the original jar?

Need to work for Flink 1.5.0 or 1.4.2

Any help appreciated

Thanks in advance

Lasse Nedergaard


Re: Flink dynamic scaling 1.5

2018-07-10 Thread Anil
Thanks for the reply Till. Resubmitting the job is an option. 
I was wondering if there's any way that Flink could be configured to detect
issues like a memory issue and rescale without me submitting the job again. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Is Flink using even-odd versioning system

2018-07-10 Thread Bowen Li
Hi Alexander,

AFAIK, Flink releases don't do that. The community has done its best to
ensure every release is at its best state.

Thanks,
Bowen


On Tue, Jul 10, 2018 at 4:54 AM Alexander Smirnov <
alexander.smirn...@gmail.com> wrote:

> to denote development and stable releases?
>


RE: high availability with automated disaster recovery using zookeeper

2018-07-10 Thread Sofer, Tovi
To add one thing to Mesos question-
My assumption that  constraints on JobManager  can work, is based on the 
sentence from link bleow
“When running Flink with Marathon, the whole Flink cluster including the job 
manager will be run as Mesos tasks in the Mesos cluster.”
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/mesos.html

[Not sure this is accurate, since it seems to contradict the image in link below
https://mesosphere.com/blog/apache-flink-on-dcos-and-apache-mesos ]

From: Sofer, Tovi [ICG-IT]
Sent: יום ג 10 יולי 2018 20:04
To: 'Till Rohrmann' ; user 
Cc: Gardi, Hila [ICG-IT] 
Subject: RE: high availability with automated disaster recovery using zookeeper

Hi Till, group,

Thank you for your response.
After reading further online on Mesos – Can’t Mesos fill the requirement of 
running job manager in primary server?
By using: “constraints”: [[“datacenter”, “CLUSTER”, “main”]]
(See 
http://www.stratio.com/blog/mesos-multi-data-center-architecture-for-disaster-recovery/
 )

Is this supported by Flink cluster on Mesos ?

Thanks again
Tovi

From: Till Rohrmann mailto:trohrm...@apache.org>>
Sent: יום ג 10 יולי 2018 10:11
To: Sofer, Tovi [ICG-IT] 
mailto:ts72...@imceu.eu.ssmb.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: high availability with automated disaster recovery using zookeeper

Hi Tovi,

that is an interesting use case you are describing here. I think, however, it 
depends mainly on the capabilities of ZooKeeper to produce the intended 
behavior. Flink itself relies on ZooKeeper for leader election in HA mode but 
does not expose any means to influence the leader election process. To be more 
precise ZK is used as a blackbox which simply tells a JobManager that it is now 
the leader, independent of any data center preferences. I'm not sure whether it 
is possible to tell ZooKeeper about these preferences. If not, then an 
alternative could be to implement one's own high availability services which 
does that at the moment.

Cheers,
Till

On Mon, Jul 9, 2018 at 1:48 PM Sofer, Tovi 
mailto:tovi.so...@citi.com>> wrote:
Hi all,

We are now examining how to achieve high availability for Flink, and to support 
also automatic recovery in disaster scenario- when all DC goes down.
We have DC1 which we usually want work to be done, and DC2 – which is more 
remote and we want work to go there only when DC1 is down.

We examined few options and would be glad to hear feedback a suggestion for 
another way to achieve this.

• Two zookeeper separate zookeeper and flink clusters on the two data 
centers.
Only the cluster on DC1 are running, and state is copied to DC2 in offline 
process.

To achieve automatic recovery we need to use some king of watch dog which will 
check DC1 availability , and if it is down will start DC2 (and same later if 
DC2 is down).

Is there recommended tool for this?

• Zookeeper “stretch cluster” cross data centers – with 2 nodes on DC1, 
2 nodes on DC2 and one observer node.

Also flink cluster jobmabnager1 on DC1 and jobmanager2 on DC2.

This way when DC1 is down, zookeeper will notice this automatically and will 
transfer work to jobmanager2 on DC2.

However we would like zookeeper leader, and flink jobmanager leader (primary 
one) to be from DC1 – unless it is down.

Is there a way to achieve this?

Thanks and regards,
[citi_logo_mail]
Tovi Sofer
Software Engineer
+972 (3) 7405756
[Mail_signature_blue]



Re: Description of Flink event time processing

2018-07-10 Thread Elias Levy
Thanks for all the comments.  I've updated the document to account for the
feedback.  Please take a look.

On Fri, Jul 6, 2018 at 2:33 PM Elias Levy 
wrote:

> Apologies.  Comments are now enabled.
>
> On Thu, Jul 5, 2018 at 6:09 PM Rong Rong  wrote:
>
>> Hi Elias,
>>
>> Thanks for putting together the document. This is actually a very good,
>> well-rounded document.
>> I think you did not to enable access for comments for the link. Would you
>> mind enabling comments for the google doc?
>>
>> Thanks,
>> Rong
>>
>>
>> On Thu, Jul 5, 2018 at 8:39 AM Fabian Hueske  wrote:
>>
>>> Hi Elias,
>>>
>>> Thanks for the great document!
>>> I made a pass over it and left a few comments.
>>>
>>> I think we should definitely add this to the documentation.
>>>
>>> Thanks,
>>> Fabian
>>>
>>> 2018-07-04 10:30 GMT+02:00 Fabian Hueske :
>>>
 Hi Elias,

 I agree, the docs lack a coherent discussion of event time features.
 Thank you for this write up!
 I just skimmed your document and will provide more detailed feedback
 later.

 It would be great to add such a page to the documentation.

 Best, Fabian

 2018-07-03 3:07 GMT+02:00 Elias Levy :

> The documentation of how Flink handles event time and watermarks is
> spread across several places.  I've been wanting a single location that
> summarizes the subject, and as none was available, I wrote one up.
>
> You can find it here:
> https://docs.google.com/document/d/1b5d-hTdJQsPH3YD0zTB4ZqodinZVHFomKvt41FfUPMc/edit?usp=sharing
>
> I'd appreciate feedback, particularly about the correctness of the
> described behavior.
>


>>>


答复: How to find the relation between a running job and the original jar?

2018-07-10 Thread Tang Cloud
Hi Lasse

   As far as I know, if you use post /jars/upload REST API to submit your job, 
you can then get /jars to list your user jar just uploaded. More information 
can refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/rest_api.html#dispatcher
Apache Flink 1.5 Documentation: Monitoring REST 
API
Flink has a monitoring API that can be used to query status and statistics of 
running jobs, as well as recent completed jobs. This monitoring API is used by 
Flink’s own dashboard, but is designed to be used also by custom monitoring 
tools. The monitoring API is a REST-ful API that accepts HTTP ...
ci.apache.org

Thanks, Yun

发件人: Lasse Nedergaard 
发送时间: 2018年7月10日 17:40
收件人: user
主题: How to find the relation between a running job and the original jar?

Hi.

I working on a solution where I want to check if a running job use the right 
jar in the right version.

Anyone knows if it is possible through the REST API to find information about a 
running job that contains the jarid or something simillary so it is possible to 
lookup the original jar?

Need to work for Flink 1.5.0 or 1.4.2

Any help appreciated

Thanks in advance

Lasse Nedergaard



Re: 答复: How to find the relation between a running job and the original jar?

2018-07-10 Thread Lasse Nedergaard
Hi Tang

Thanks for the link. Yes your are rights and it works fine. But when I use the 
REST API for getting running jobs I can’t find any reference back to the jar 
used to start the job. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 11. jul. 2018 kl. 05.22 skrev Tang Cloud :
> 
> Hi Lasse
> 
>As far as I know, if you use post /jars/upload REST API to submit your 
> job, you can then get /jars to list your user jar just uploaded. More 
> information can refer to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/rest_api.html#dispatcher
> Apache Flink 1.5 Documentation: Monitoring REST API
> Flink has a monitoring API that can be used to query status and statistics of 
> running jobs, as well as recent completed jobs. This monitoring API is used 
> by Flink’s own dashboard, but is designed to be used also by custom 
> monitoring tools. The monitoring API is a REST-ful API that accepts HTTP ...
> ci.apache.org
> Thanks, Yun
> 发件人: Lasse Nedergaard 
> 发送时间: 2018年7月10日 17:40
> 收件人: user
> 主题: How to find the relation between a running job and the original jar?
>  
> Hi.
> 
> I working on a solution where I want to check if a running job use the right 
> jar in the right version.
> 
> Anyone knows if it is possible through the REST API to find information about 
> a running job that contains the jarid or something simillary so it is 
> possible to lookup the original jar?
> 
> Need to work for Flink 1.5.0 or 1.4.2
> 
> Any help appreciated 
> 
> Thanks in advance
> 
> Lasse Nedergaard
> 


RE: Access the data in a stream after writing to a sink

2018-07-10 Thread Teena Kappen // BPRISE
Yes.. a custom sink with the required checks seems to be the only option.

From: Hequn Cheng 
Sent: 10 July 2018 18:23
To: Teena Kappen // BPRISE 
Cc: user@flink.apache.org
Subject: Re: Access the data in a stream after writing to a sink

Hi Teena,

It seems that a sink can not output data into another sink. Maybe we can 
implement a combined user defined sink. In the combined sink, only write to the 
next sink if the first write is successful.

On Tue, Jul 10, 2018 at 3:23 PM, Teena Kappen // BPRISE 
mailto:teena.kap...@bprise.com>> wrote:
Adding to the previous question, is it possible to check if each record in a 
stream was written without any exceptions to a Cassandra Sink? I have to write 
the records to the next sink only if the first write is successful. So, 
replicating the streams before the write is not an option.

From: Teena Kappen // BPRISE 
mailto:teena.kap...@bprise.com>>
Sent: 10 July 2018 12:50
To: user@flink.apache.org
Subject: Access the data in a stream after writing to a sink

Hi,

Is it possible to access the data in a stream that was written to a sink? I 
have a Cassandra Sink in my stream job and I have to access all the records 
that were written to the Cassandra sink and write it to another sink. Is there 
any way to do that?

Regards,
Teena



Re: Filter columns of a csv file with Flink

2018-07-10 Thread Hequn Cheng
Hi francois,

> Is there any plan to give avro schemas a better role in Flink in further
versions?
Haven't heard about avro for csv. You can open a jira for it. Maybe also
contribute to flink :-)


On Tue, Jul 10, 2018 at 11:32 PM, françois lacombe <
francois.laco...@dcbrain.com> wrote:

> Hi Hequn,
>
> 2018-07-10 3:47 GMT+02:00 Hequn Cheng :
>
>> Maybe I misunderstand you. So you don't want to skip the whole file?
>>
> Yes I do
> By skipping the whole file I mean "throw an Exception to stop the process
> and inform user that file is invalid for a given reason" and not "the
> process goes fully right and import 0 rows"
>
>
>> If does, then "extending CsvTableSource and provide the avro schema to
>> the constructor without creating a custom AvroInputFormat" is ok.
>>
>
> Then we agree on this
> Is there any plan to give avro schemas a better role in Flink in further
> versions?
> Avro schemas are perfect to build CSVTableSource with code like
>
> for (Schema field_nfo : sch.getTypes()){
>  // Test if csv file header actually contains a field corresponding to
> schema
>  if (!csv_headers.contains(field_nfo.getName())) {
>   throw new NoSuchFieldException(field_nfo.getName());
>  }
>
>  // Declare the field in the source Builder
>  src_builder.field(field_nfo.getName(), primitiveTypes.get(field_nfo.
> getType()));
> }
>
> All the best
>
> François
>
>
>
>> On Mon, Jul 9, 2018 at 11:03 PM, françois lacombe <
>> francois.laco...@dcbrain.com> wrote:
>>
>>> Hi Hequn,
>>>
>>> 2018-07-09 15:09 GMT+02:00 Hequn Cheng :
>>>
 The first step requires an AvroInputFormat because the source needs
 AvroInputFormat to read avro data if data match schema.

>>>
>>> I don't want avro data, I just want to check if my csv file have the
>>> same fields than defined in a given avro schema.
>>> Processing should stop if and only if I find missing columns.
>>>
>>> A record which not match the schema (types mainly) should be rejected
>>> and logged in a dedicated file but the processing can go on.
>>>
>>> How about extending CsvTableSource and provide the avro schema to the
>>> constructor without creating a custom AvroInputFormat?
>>>
>>>
>>> François
>>>
>>
>>
>


Re: Confusions About JDBCOutputFormat

2018-07-10 Thread Hequn Cheng
Hi wangsan,

What I mean is establishing a connection each time write data into JDBC,
i.e.  establish a connection in flush() function. I think this will make
sure the connection is ok. What do you think?

On Wed, Jul 11, 2018 at 12:12 AM, wangsan  wrote:

> Hi Hequn,
>
> Establishing a connection for each batch write may also have idle
> connection problem, since we are not sure when the connection will be
> closed. We call flush() method when a batch is finished or  snapshot state,
> but what if the snapshot is not enabled and the batch size not reached
> before the connection is closed?
>
> May be we could use a Timer to test the connection periodically and keep
> it alive. What do you think?
>
> I will open a jira and try to work on that issue.
>
> Best,
> wangsan
>
>
>
> On Jul 10, 2018, at 8:38 PM, Hequn Cheng  wrote:
>
> Hi wangsan,
>
> I agree with you. It would be kind of you to open a jira to check the
> problem.
>
> For the first problem, I think we need to establish connection each time
> execute batch write. And, it is better to get the connection from a
> connection pool.
> For the second problem, to avoid multithread problem, I think we should
> synchronized the batch object in flush() method.
>
> What do you think?
>
> Best, Hequn
>
>
>
> On Tue, Jul 10, 2018 at 2:36 PM, wangsan  wrote:
>
>> Hi all,
>>
>> I'm going to use JDBCAppendTableSink and JDBCOutputFormat in my Flink
>> application. But I am confused with the implementation of JDBCOutputFormat.
>>
>> 1. The Connection was established when JDBCOutputFormat is opened, and
>> will be used all the time. But if this connction lies idle for a long time,
>> the database will force close the connetion, thus errors may occur.
>> 2. The flush() method is called when batchCount exceeds the threshold,
>> but it is also called while snapshotting state. So two threads may modify
>> upload and batchCount, but without synchronization.
>>
>> Please correct me if I am wrong.
>>
>> ——
>> wangsan
>>
>
>
>