Re: Getting JobExecutionException: Could not set up JobManager when trying to upload new version

2019-04-23 Thread Avi Levi
Might be useful for someone, Regarding this issue. it seems that changing
the uid of the operator made this mess .

On Tue, Apr 16, 2019 at 6:31 PM Avi Levi  wrote:

> I am trying to upload a new version of the code but I am getting the
> exception below. The schema of the state was not changed for a while . what
> can be the reason for that (also attached the log file) ?
>>
>>
>> 2019-04-16 15:14:11.112 [flink-akka.actor.default-dispatcher-1138] ERROR
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Failed to
>> submit job 693a02204ef5816f91ea3b135f544a7f.
>> java.lang.RuntimeException:
>> org.apache.flink.runtime.client.JobExecutionException: Could not set up
>> JobManager
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
>> not set up JobManager
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>> ... 7 common frames omitted
>> Caused by: java.lang.IllegalStateException: Failed to rollback to
>> checkpoint/savepoint
>> gs://bv-flink-state/dev/state/savepoint-7cbaf2-48f14797. Cannot map
>> checkpoint/savepoint state for operator 3cfeb06db0484d5556a7de8db2025f09 to
>> the new program, because the operator is not available in the new program.
>> If you want to allow to skip this, you can set the --allowNonRestoredState
>> option on the CLI.
>> at
>> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1103)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1241)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1165)
>> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
>> ... 10 common frames omitted
>> 2019-04-16 15:14:11.242 [flink-akka.actor.default-dispatcher-1138] ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  - Exception
>> occurred in REST handler:
>> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
>> job.
>> 2019-04-16 15:14:11.947 [flink-akka.actor.default-dispatcher-1155] ERROR
>> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  - Exception
>> occurred in REST handler: Job e43ed04cdedd73f9bb9836e87142afbf not found
>>
>
> Thanks for your help.
>
> Cheers
> Avi
>


Re: lack of function and low usability of provided function

2019-04-23 Thread 徐涛
Found another function which does not implement the function as it declared.  
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html#temporal-functions
 

The function is TIMESTAMP string.

I use the sql as follows, and the ttt type is String.
insert into rslt select word,cast( TIMESTAMPADD(HOUR, 2,   TIMESTAMP ttt) 
as varchar) from xxx

But it throws the following exception, the Flink version is 1.7.2.
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "TIMESTAMP ttt" at line 1, column 60.
Was expecting one of:
"+" ...
"-" ...
"NOT" ...
"EXISTS" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"TRUE" ...
"FALSE" ...
"UNKNOWN" ...
"NULL" ...
 ...
 ...
 ...
"DATE" ...
"TIME" ...
"TIMESTAMP"  ...
"INTERVAL" ...
"?" ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"NEXT" ...
 ...
"MULTISET" ...
"ARRAY" ...
"PERIOD" ...
"SPECIFIC" ...
 ...
 ...
 ...
 ...
 ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"CURRENT_DATE" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LAST_VALUE" ...
"LN" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_SXX" ...
"REGR_SYY" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
"USER" ...
"VAR_POP" ...
"VAR_SAMP" ...
"YEAR" ...
"CURRENT_CATALOG" ...
"CURRENT_DEFAULT_TRANSFORM_GROUP" ...
"CURRENT_PATH" ...
"CURRENT_ROLE" ...
"CURRENT_SCHEMA" ...
"CURRENT_USER" ...
"SESSION_USER" ...
"SYSTEM_USER" ...
"NEW" ...
"CASE" ...
"CURRENT" ...
"CURSOR" ...
"ROW" ...
"(" ...

  at 
org.apache.flink.table.calcite.FlinkPlannerImpl.parse(FlinkPlannerImpl.scala:94)
  at 
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:803)
  at 
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:777)
  at 
com.ximalaya.flink.dsl.application.simple.HelloWorldTable$.main(HelloWorldTable.scala:44)
  at 
com.ximalaya.flink.dsl.application.simple.HelloWorldTable.main(HelloWorldTable.scala)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered 
"TIMESTAMP ttt" at line 1, column 60.
Was expecting one of:
"+" ...
"-" ...
"NOT" ...
"EXISTS" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"TRUE" ...
"FALSE" ...
"UNKNOWN" ...
Disconnected from the target VM, address: '127.0.0.1:55077', transport: 'socket'
"NULL" ...
 ...
 ...
 ...
"DATE" ...
"TIME" ...
"TIMESTAMP"  ...
"INTERVAL" ...
"?" ...
"CAST" ...
"EXTRACT" ...
"POSITION" ...
"CONVERT" ...
"TRANSLATE" ...
"OVERLAY" ...
"FLOOR" ...
"CEIL" ...
"CEILING" ...
"SUBSTRING" ...
"TRIM" ...
"CLASSIFIER" ...
"MATCH_NUMBER" ...
"RUNNING" ...
"PREV" ...
"NEXT" ...
 ...
"MULTISET" ...
"ARRAY" ...
"PERIOD" ...
"SPECIFIC" ...
 ...
 ...
 ...
 ...
 ...
"ABS" ...
"AVG" ...
"CARDINALITY" ...
"CHAR_LENGTH" ...
"CHARACTER_LENGTH" ...
"COALESCE" ...
"COLLECT" ...
"COVAR_POP" ...
"COVAR_SAMP" ...
"CUME_DIST" ...
"COUNT" ...
"CURRENT_DATE" ...
"CURRENT_TIME" ...
"CURRENT_TIMESTAMP" ...
"DENSE_RANK" ...
"ELEMENT" ...
"EXP" ...
"FIRST_VALUE" ...
"FUSION" ...
"GROUPING" ...
"HOUR" ...
"LAG" ...
"LEAD" ...
"LAST_VALUE" ...
"LN" ...
"LOCALTIME" ...
"LOCALTIMESTAMP" ...
"LOWER" ...
"MAX" ...
"MIN" ...
"MINUTE" ...
"MOD" ...
"MONTH" ...
"NTH_VALUE" ...
"NTILE" ...
"NULLIF" ...
"OCTET_LENGTH" ...
"PERCENT_RANK" ...
"POWER" ...
"RANK" ...
"REGR_SXX" ...
"REGR_SYY" ...
"ROW_NUMBER" ...
"SECOND" ...
"SQRT" ...
"STDDEV_POP" ...
"STDDEV_SAMP" ...
"SUM" ...
"UPPER" ...
"TRUNCATE" ...
  

Re: Error restoring from checkpoint on Flink 1.8

2019-04-23 Thread Ning Shi
Hi Congxian,

I think I have figured out the issue. It's related to the checkpoint directory
collision issue you responded to in the other thread. We reproduced this bug on
1.6.1 after unchaining the operators.

There are two stateful operators in the chain, one is a
CoBroadcastWithKeyedOperator, the other one is a StreamMapper. The
CoBroadcastWithKeyedOperator creates timer states in RocksDB, the latter
doesn’t. Because of the checkpoint directory collision bug, we always end up
saving the states for CoBroadcastWithKeyedOperator.

After breaking these two operators apart, they try to restore from the same set
of saved states. When the StreamMapper opens the RocksDB files, it doesn’t care
about any of the column families in there, including the timer states. Hence the
error.

--
Ning


Re: No zero ( 2 ) exit code on k8s StandaloneJobClusterEntryPoint when save point with cancel...

2019-04-23 Thread Vishal Santoshi
Anyione ?



I think there some race condition .  These are the TM logs.. I am puzzled
b'coz in a larger pipe ( there are about 32 lots on 8 replicas and it works




2019-04-24 01:16:20,889 DEBUG
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
Releasing local state under allocation id 5a853ef886e1c599f86b9503306fffd2.

2019-04-24 01:16:20,894 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
JobManager connection for job .

org.apache.flink.util.FlinkException: Stopping JobMaster for job
EventCountJob().

at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:355)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)

at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.ActorCell.invoke(ActorCell.scala:495)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2019-04-24 01:16:20,895 INFO
org.apache.flink.runtime.taskexecutor.JobLeaderService- Cannot
reconnect to job  because it is not
registered.

2019-04-24 01:16:21,053 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from e61c2b7d992f151936e21db1ca0d.

2019-04-24 01:16:22,136 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x25add5478fb2ec6 after 0ms

2019-04-24 01:16:31,052 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from e61c2b7d992f151936e21db1ca0d.

2019-04-24 01:16:35,483 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x25add5478fb2ec6 after 0ms

On Tue, Apr 23, 2019 at 3:11 PM Vishal Santoshi 
wrote:

> I see this in the TM pod
>
> 2019-04-23 19:08:41,828 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>
> 2019-04-23 19:08:47,543 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>
> 2019-04-23 19:08:55,175 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x15cc7f3d88466a5 after 1ms
>
> 2019-04-23 19:08:57,548 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>
> 2019-04-23 19:09:07,543 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>
> 2019-04-23 19:09:08,523 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>
> 2019-04-23 19:09:17,542 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>
> 2019-04-23 19:09:21,871 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>
> 2019-04-23 19:09:27,543 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>
> 2019-04-23 19:09:35,218 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>
> 2019-04-23 19:09:37,542 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>
>
>
> JM log has analogous..
>
>
> 2019-04-23 19:10:49,218 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x25add5478fb2e7c after 0ms
>
>
>
> Does that ring a bell ?
>
>
>
> On Tue, Apr 23, 2019 at 2:04 PM Vishal Santoshi 
> wrote:
>
>> Adding the DEBUG  logs from the time I call a REST based cancel 

Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-23 Thread Rong Rong
Hi Mans,

I am not sure what you meant by "dynamically change the end-time of a
window. If you are referring to dynamically determines the firing time of
the window, then it fits into the description of session window [1]:
If you want to handle window end time dynamically, one way of which I can
think of is the dynamic gap, session window [1] approach. with which you
can specify the end-time of a window based on input elements. Provided that
you are maintaining a session window.
Another way to look at it is through the Flink-CEP library [2].

Thanks,
Rong


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns

On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:

> Hi:
>
> I am working on a project and need to change the end time of the window
> dynamically.  I want to find out if the end time of the window is used
> internally (for sorting windows/etc) except for handling watermarks that
> would cause problems if the end time was changed during run time after the
> window has been created even if no new event has arrived for that window.
>
> I don't want to use GlobalWindow since from my understanding it never gets
> destroyed.
>
> If there is any alternate way of dealing with this, please let me know.
>
> Thanks
>
> Mans
>


Re: How to implement custom stream operator over a window? And after the Count-Min Sketch?

2019-04-23 Thread Rong Rong
Hi Felipe,

In a short glance, the question can depend on how your window is (is there
any overlap like sliding window) and how many data you would like to
process.

In general, you can always buffer all the data into a ListState and apply
your window function by iterating through all those buffered elements [1].
Provided that the data size is small enough to be hold efficiently in the
state-backend.
If this algorithm has some sort of pre-aggregation that can simplify the
buffering through an incremental, orderless aggregation, you can also think
about using [2].
With these two approaches, you do not necessarily need to implement your
own window operator (extending window operator can be tricky), and you also
have access to the internal state [3].

Hope these helps your exploration.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction

On Tue, Apr 23, 2019 at 8:16 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I want to implement my own operator that computes the Count-Min Sketch
> over a window in Flink. Then, I found this Jira issue [1]
>  which is exactly what
> I want. I believe that I have to work out my skills to arrive at a mature
> solution.
>
> So, the first thing that comes to my mind is to create my custom operator
> like the AggregateApplyWindowFunction [2]
> .
> Through this I can create the summary of my data over a window.
>
> Also, I found this custom JoinOperator example by Till Rohrmann [3]
>  which I think I can base my
> implementation since it is done over a DataStream.
>
> What are your suggestions to me in order to start to implement a custom
> stream operator which computes Count-Min Sketch? Do you have any custom
> operator over window/keyBy that I can learn with the source code?
>
> ps.: I have implemented (looking at Blink source code) this a custom
> Combiner [4]
> 
> (map-combiner-reduce) operator.
>
> [1] https://issues.apache.org/jira/browse/FLINK-2147
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.html
> [3] https://github.com/tillrohrmann/custom-join
> [4]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operator/AbstractRichMapStreamBundleOperator.java
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


get custom gauge metric from WebMonitorEndpoint

2019-04-23 Thread Georgi Stoyanov
I've got custom metric ->


[cid:0dbd2582-19b6-4a0e-9dff-49471e973c4c]

And I'm using them as suggested in the documentation ->

getRuntimeContext().getMetricGroup().gauge("MyCustomMetric", new TestMetric());


I want to get this metric with GET method, but so far I tried almost everything 
in the API documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html
 ) but didn't find that metric. Do you know how (or even could I) get that 
custom metric via API?


Kind Regards,
Georgi




Looking for help in configuring Swift as State Backend

2019-04-23 Thread PoolakkalMukkath, Shakir
Hi,

I am looking for some help in configuring the Swift Fs as State Backend. I am 
unable to configure it, let me know if anyone has prior done this or knowledge 
to help me

Do we still need to run an HDFS to use this feature ?

Thanks,
Shakir


Re: No zero ( 2 ) exit code on k8s StandaloneJobClusterEntryPoint when save point with cancel...

2019-04-23 Thread Vishal Santoshi
I see this in the TM pod

2019-04-23 19:08:41,828 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x15cc7f3d88466a5 after 0ms

2019-04-23 19:08:47,543 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.

2019-04-23 19:08:55,175 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x15cc7f3d88466a5 after 1ms

2019-04-23 19:08:57,548 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.

2019-04-23 19:09:07,543 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.

2019-04-23 19:09:08,523 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x15cc7f3d88466a5 after 0ms

2019-04-23 19:09:17,542 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.

2019-04-23 19:09:21,871 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x15cc7f3d88466a5 after 0ms

2019-04-23 19:09:27,543 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.

2019-04-23 19:09:35,218 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x15cc7f3d88466a5 after 0ms

2019-04-23 19:09:37,542 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor- Received
heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.



JM log has analogous..


2019-04-23 19:10:49,218 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x25add5478fb2e7c after 0ms



Does that ring a bell ?



On Tue, Apr 23, 2019 at 2:04 PM Vishal Santoshi 
wrote:

> Adding the DEBUG  logs from the time I call a REST based cancel with save
> point...
>
> On Tue, Apr 23, 2019 at 2:01 PM Vishal Santoshi 
> wrote:
>
>> Though looking at
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java#L88
>>  it
>> does seem that the last log . is expected.
>>
>> Not sure what part is hanging... I have more logs I can share...
>>
>> On Tue, Apr 23, 2019 at 1:59 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I am seeing this weird issue where I do a save point with cancel on a
>>> job on k8s and it hangs for 5 minutes ( no INFO logs ) and then exits with
>>> code of 2.
>>>
>>>
>>> 2019-04-23 17:36:31,372 INFO
>>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  -
>>> Shutting down rest endpoint.
>>>
>>> 2019-04-23 17:36:31,374 INFO
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>> - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>>
>>> 2019-04-23 17:36:31,377 INFO
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>> Suspending SlotPool.
>>>
>>> 2019-04-23 17:36:31,378 DEBUG
>>> org.apache.flink.runtime.jobmaster.JobMaster  - Close
>>> ResourceManager connection 181a4fd61044033a2ea32e384096247f.
>>>
>>> org.apache.flink.util.FlinkException: JobManager is shutting down.
>>>
>>> at
>>> org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:365)
>>>
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)
>>>
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
>>>
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>
>>> at
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>
>>> at
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> 2019-04-23 17:36:31,381 INFO
>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl 

Re: No zero ( 2 ) exit code on k8s StandaloneJobClusterEntryPoint when save point with cancel...

2019-04-23 Thread Vishal Santoshi
Though looking at
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java#L88
it
does seem that the last log . is expected.

Not sure what part is hanging... I have more logs I can share...

On Tue, Apr 23, 2019 at 1:59 PM Vishal Santoshi 
wrote:

> I am seeing this weird issue where I do a save point with cancel on a job
> on k8s and it hangs for 5 minutes ( no INFO logs ) and then exits with code
> of 2.
>
>
> 2019-04-23 17:36:31,372 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Shutting
> down rest endpoint.
>
> 2019-04-23 17:36:31,374 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>
> 2019-04-23 17:36:31,377 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
> Suspending SlotPool.
>
> 2019-04-23 17:36:31,378 DEBUG org.apache.flink.runtime.jobmaster.JobMaster
> - Close ResourceManager connection
> 181a4fd61044033a2ea32e384096247f.
>
> org.apache.flink.util.FlinkException: JobManager is shutting down.
>
> at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:365)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 2019-04-23 17:36:31,381 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Stopping
> SlotPool.
>
> 2019-04-23 17:36:31,381 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Disconnect job manager a41a5dceae5ad3664ff1f0b79f3e47ef
> @akka.tcp://flink@kafka-to-prometheus:6123/user/jobmanager_0 for job
>  from the resource manager.
>
> 2019-04-23 17:36:31,385 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
> Stopping ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService{leaderPath='/leader//job_manager_lock'}.
>
>
>
>
> and after 5 minutes ..
>
>
>
> 019-04-23 17:41:32,187 DEBUG
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache  - Freed 8
> thread-local buffer(s) from thread: Finalizer
>
> 2019-04-23 17:41:32,198 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped
> Akka RPC service.
>
> 2019-04-23 17:41:32,200 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint
> - Terminating cluster entrypoint process
> StandaloneJobClusterEntryPoint with exit code 2.
>
> java.util.concurrent.TimeoutException
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:942)
>
> at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$11(FutureUtils.java:360)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>
>
>
>
>
> In the interim, I get this at a regular clip
>
>
>
> 2019-04-23 17:37:02,452 DEBUG
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
> Release TaskExecutor 3752235c49428b94a0520f04266973eb because it exceeded
> the idle timeout.
>
> 2019-04-23 17:37:02,453 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Worker 68c5fbd67ac2bbe6fc35ed068ce1c4b1 could not be stopped.
>
>
>
>
> Any ideas as to whether it is this inability to shut down the Worker that
> is causing this issue ?
>
>
>
>
>
> Regards.
>


No zero ( 2 ) exit code on k8s StandaloneJobClusterEntryPoint when save point with cancel...

2019-04-23 Thread Vishal Santoshi
I am seeing this weird issue where I do a save point with cancel on a job
on k8s and it hangs for 5 minutes ( no INFO logs ) and then exits with code
of 2.


2019-04-23 17:36:31,372 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Shutting
down rest endpoint.

2019-04-23 17:36:31,374 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

2019-04-23 17:36:31,377 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Suspending
SlotPool.

2019-04-23 17:36:31,378 DEBUG org.apache.flink.runtime.jobmaster.JobMaster
- Close ResourceManager connection
181a4fd61044033a2ea32e384096247f.

org.apache.flink.util.FlinkException: JobManager is shutting down.

at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:365)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)

at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.ActorCell.invoke(ActorCell.scala:495)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2019-04-23 17:36:31,381 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Stopping
SlotPool.

2019-04-23 17:36:31,381 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Disconnect job manager a41a5dceae5ad3664ff1f0b79f3e47ef
@akka.tcp://flink@kafka-to-prometheus:6123/user/jobmanager_0 for job
 from the resource manager.

2019-04-23 17:36:31,385 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
Stopping ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService{leaderPath='/leader//job_manager_lock'}.




and after 5 minutes ..



019-04-23 17:41:32,187 DEBUG
org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache  - Freed 8
thread-local buffer(s) from thread: Finalizer

2019-04-23 17:41:32,198 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped
Akka RPC service.

2019-04-23 17:41:32,200 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
- Terminating cluster entrypoint process
StandaloneJobClusterEntryPoint with exit code 2.

java.util.concurrent.TimeoutException

at
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:942)

at
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)

at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$11(FutureUtils.java:360)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)






In the interim, I get this at a regular clip



2019-04-23 17:37:02,452 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Release
TaskExecutor 3752235c49428b94a0520f04266973eb because it exceeded the idle
timeout.

2019-04-23 17:37:02,453 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Worker 68c5fbd67ac2bbe6fc35ed068ce1c4b1 could not be stopped.




Any ideas as to whether it is this inability to shut down the Worker that
is causing this issue ?





Regards.


[DISCUSS] Temporarily remove support for job rescaling via CLI action "modify"

2019-04-23 Thread Gary Yao
Hi all,

As the subject states, I am proposing to temporarily remove support for
changing the parallelism of a job via the following syntax [1]:

./bin/flink modify [job-id] -p [new-parallelism]

This is an experimental feature that we introduced with the first rollout of
FLIP-6 (Flink 1.5). However, this feature comes with a few caveats:

* Rescaling does not work with HA enabled [2]
* New parallelism is not persisted, i.e., after a JobManager restart,
the job
  will be recovered with the initial parallelism

Due to the above-mentioned issues, I believe that currently nobody uses
"modify -p" to rescale their jobs in production. Moreover, the rescaling
feature stands in the way of our current efforts to rework Flink's
scheduling
[3]. I therefore propose to remove the rescaling code for the time being.
Note
that it will still be possible to change the parallelism by taking a
savepoint
and restoring the job with a different parallelism [4].

Any comments and suggestions will be highly appreciated.

Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html
[2] https://issues.apache.org/jira/browse/FLINK-8902
[3] https://issues.apache.org/jira/browse/FLINK-10429
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html#what-happens-when-i-change-the-parallelism-of-my-program-when-restoring


Re: Sinking messages in RabbitMQ

2019-04-23 Thread Oytun Tez
I think you should exchangeDeclare when you open the sink. When invoked,
you can channel.basicPublish(exchangeName).

Would this work? We have a single exchange, so didn't explore this method.

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Tue, Apr 23, 2019 at 12:37 PM Soheil Pourbafrani 
wrote:

> I'm using Flink RabbitMQ Connector for Sinking Data but using the
> RMQConnectionConfig object I couldn't find any method to set the type of
> the exchange (Fanout, Topic, Direct). And also the RMQSink get just name of
> the queue as the parameter. Is there any way to specify the exchange type?
>


Re: Fast restart of a job with a large state

2019-04-23 Thread Sergey Zhemzhitsky
Hi Stefan, Paul,

Thanks for the tips! Currently I have not tried neither rescaling from
checkpoints nor task local recovery. Now it's a subject to test.

In case it will be necessary not to just rescale a job, but also to change
its DAG - is there a way to have something like let's call it "local
savepoints" or "incremental savepoints" to prevent the whole state
transferring to and from a distributed storage?

Kind Regards,
Sergey


On Thu, Apr 18, 2019, 13:22 Stefan Richter  wrote:

> Hi,
>
> If rescaling is the problem, let me clarify that you can currently rescale
> from savepoints and all types of checkpoints (including incremental). If
> that was the only problem, then there is nothing to worry about - the
> documentation is only a bit conservative about this because we will not
> commit to an APU that all future types checkpoints will be resealable. But
> currently they are all, and this is also very unlikely to change anytime
> soon.
>
> Paul, just to comment on your suggestion as well, local recovery would
> only help with failover. 1) It does not help for restarts by the user and
> 2) also does not work for rescaling (2) is a consequence of 1) because
> failover never rescales, only restarts).
>
> Best,
> Stefan
>
> On 18. Apr 2019, at 12:07, Paul Lam  wrote:
>
> The URL in my previous mail is wrong, and it should be:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
>
> Best,
> Paul Lam
>
> 在 2019年4月18日,18:04,Paul Lam  写道:
>
> Hi,
>
> Have you tried task local recovery [1]?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>
> Best,
> Paul Lam
>
> 在 2019年4月17日,17:46,Sergey Zhemzhitsky  写道:
>
> Hi Flinkers,
>
> Operating different flink jobs I've discovered that job restarts with
> a pretty large state (in my case this is up to 100GB+) take quite a
> lot of time. For example, to restart a job (e.g. to update it) the
> savepoint is created, and in case of savepoints all the state seems to
> be pushed into the distributed store (hdfs in my case) when stopping a
> job and pulling this state back when starting the new version of the
> job.
>
> What I've found by the moment trying to speed up job restarts is:
> - using external retained checkpoints [1]; the drawback is that the
> job cannot be rescaled during restart
> - using external state and storage with the stateless jobs; the
> drawback is the necessity of additional network hops to this storage.
>
> So I'm wondering whether there are any best practices community knows
> and uses to cope with the cases like this?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>
>
>
>
>


Sinking messages in RabbitMQ

2019-04-23 Thread Soheil Pourbafrani
I'm using Flink RabbitMQ Connector for Sinking Data but using the
RMQConnectionConfig object I couldn't find any method to set the type of
the exchange (Fanout, Topic, Direct). And also the RMQSink get just name of
the queue as the parameter. Is there any way to specify the exchange type?


May be useful: our reference document for "Understanding State in Flink"

2019-04-23 Thread Oytun Tez
We keep a document with state-related use cases in our application, useful
for onboarding new engineers in the application. See attached PDF.

May be useful for others. And of course, corrections are welcome. (Couldn't
share our Wiki page)


---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


MW-IPM-UnderstandingState-230419-1550.pdf
Description: Adobe PDF document


json数据sink到parquet文件

2019-04-23 Thread scarlet


刚才应该没发出去,如果重复请忽略


请教大家一个问题,我这边有个数据源是json格式的,接的打点数据,里面的字段不确定,有什么方式可以sink到parquet文件吗,谢谢

Re: Missing state in RocksDB checkpoints

2019-04-23 Thread Ning Shi
On Tue, 23 Apr 2019 10:53:52 -0400,
Congxian Qiu wrote:
> Sorry for the misleading, in the previous email, I just want to say the 
> problem is not caused by the UUID generation, it is caused by the different 
> operators share the same directory(because currentlyFlink uses JobVertx as 
> the directory)

Ah, thank you for the clarification, Congxian. That makes sense.

Ning


Apache Flink - Question about dynamically changing window end time at run time

2019-04-23 Thread M Singh
Hi:
I am working on a project and need to change the end time of the window 
dynamically.  I want to find out if the end time of the window is used 
internally (for sorting windows/etc) except for handling watermarks that would 
cause problems if the end time was changed during run time after the window has 
been created even if no new event has arrived for that window.

I don't want to use GlobalWindow since from my understanding it never gets 
destroyed.

If there is any alternate way of dealing with this, please let me know.

Thanks
Mans


How to implement custom stream operator over a window? And after the Count-Min Sketch?

2019-04-23 Thread Felipe Gutierrez
Hi,

I want to implement my own operator that computes the Count-Min Sketch over
a window in Flink. Then, I found this Jira issue [1]
 which is exactly what I
want. I believe that I have to work out my skills to arrive at a mature
solution.

So, the first thing that comes to my mind is to create my custom operator
like the AggregateApplyWindowFunction [2]
.
Through this I can create the summary of my data over a window.

Also, I found this custom JoinOperator example by Till Rohrmann [3]
 which I think I can base my
implementation since it is done over a DataStream.

What are your suggestions to me in order to start to implement a custom
stream operator which computes Count-Min Sketch? Do you have any custom
operator over window/keyBy that I can learn with the source code?

ps.: I have implemented (looking at Blink source code) this a custom
Combiner [4]

(map-combiner-reduce) operator.

[1] https://issues.apache.org/jira/browse/FLINK-2147
[2]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.html
[3] https://github.com/tillrohrmann/custom-join
[4]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operator/AbstractRichMapStreamBundleOperator.java

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: Missing state in RocksDB checkpoints

2019-04-23 Thread Congxian Qiu
Hi Ning,
Sorry for the misleading, in the previous email, I just want to say the problem 
is not caused by the UUID generation, it is caused by the different operators 
share the same directory(because currentlyFlink uses JobVertx as the directory)

Best, Congxian
On Apr 23, 2019, 19:41 +0800, Ning Shi , wrote:
> Congxian,
>
> Thank you for creating the ticket and providing the relevant code. I’m 
> curious why you don’t think the directory collision is not a problem. What we 
> observe is that one of the operator states are not included in the checkpoint 
> and data is lost on restore. That’s a pretty serious problem especially when 
> Flink doesn’t generate any error in the log. People could be losing states 
> silently potentially.
>
> Please let me know how I can best help diagnose this issue and drive the 
> ticket forward. I’m happy to collect any relevant information.
>
> Thanks,
>
> —
> Ning
>
> > On Apr 23, 2019, at 2:54 AM, Congxian Qiu  wrote:
> >
> > From the log message you given, the two operate share the same directory, 
> > and when snapshot, the directory will be deleted first if it 
> > exists(RocksIncrementalSnapshotStrategy#prepareLocalSnapshotDirectory).
> >
> > I did not find an issue for this problem, and I don’t thinks this is a 
> > problem of UUID generation problem, please check the path generation logic 
> > in LocalRecoveryDirectoryProviderImpl#subtaskSpecificCheckpointDirectory.
> >
> > I’ve created an issue for this problem.


Re: Missing state in RocksDB checkpoints

2019-04-23 Thread Ning Shi
Congxian,

We just did a test. Separating the two stateful operators from
chaining seems to have worked around the problem. The states for both
of them are successfully saved in the checkpoint.

Ning

On Tue, Apr 23, 2019 at 7:41 AM Ning Shi  wrote:
>
> Congxian,
>
> Thank you for creating the ticket and providing the relevant code. I’m 
> curious why you don’t think the directory collision is not a problem. What we 
> observe is that one of the operator states are not included in the checkpoint 
> and data is lost on restore. That’s a pretty serious problem especially when 
> Flink doesn’t generate any error in the log. People could be losing states 
> silently potentially.
>
> Please let me know how I can best help diagnose this issue and drive the 
> ticket forward. I’m happy to collect any relevant information.
>
> Thanks,
>
> —
> Ning
>
> > On Apr 23, 2019, at 2:54 AM, Congxian Qiu  wrote:
> >
> > From the log message you given, the two operate share the same directory, 
> > and when snapshot, the directory will be deleted first if it 
> > exists(RocksIncrementalSnapshotStrategy#prepareLocalSnapshotDirectory).
> >
> > I did not find an issue for this problem, and I don’t thinks this is a 
> > problem of UUID generation problem, please check the path generation logic 
> > in LocalRecoveryDirectoryProviderImpl#subtaskSpecificCheckpointDirectory.
> >
> > I’ve created an issue for this problem.


Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

2019-04-23 Thread Oytun Tez
Thank you Guowei and Dawid! I am trying your suggestions today and will
report back.

- I assume the cleaning operation should be done only once because of the
upgrade, or should I run every time the application is up?
- `static` sounds a very simple fix to get rid of this. Any drawbacks here?




---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Tue, Apr 23, 2019 at 2:56 AM Dawid Wysakowicz 
wrote:

> Hi Oytun,
>
> I think there is a regression introduced in 1.8 how we handle output tags.
> The problem is we do not call ClosureCleaner on OutputTag.
>
> There are two options how you can workaround this issue:
>
> 1. Declare the OutputTag static
>
> 2. Clean the closure explicitly as Guowei suggested:
> StreamExecutionEnvironment.clean(pendingProjectsTag)
>
> I also opened a jira issue to fix this (FLINK-12297[1])
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-12297
> On 22/04/2019 03:06, Guowei Ma wrote:
>
> I think you could try
> StreamExecutionEnvironment.clean(pendingProjectsTag).
>
>
> Oytun Tez 于2019年4月19日 周五下午9:58写道:
>
>> Forgot to answer one of your points: the parent class compiles well
>> without this CEP selector (with timeout signature)...
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez  wrote:
>>
>>> Hey JingsongLee!
>>>
>>> Here are some findings...
>>>
>>>- flatSelect *without timeout* works normally:
>>>patternStream.flatSelect(PatternFlatSelectFunction), this compiles
>>>well.
>>>- Converted the both timeout and select selectors to an *inner class*
>>>(not static), yielded the same results, doesn't compile.
>>>- flatSelect *without* timeout, but with an inner class for
>>>PatternFlatSelectFunction, it compiles (same as first bullet).
>>>- Tried both of these selectors with *empty* body. Just a skeleton
>>>class. Doesn't compile either. Empty body example is in my first email.
>>>- Tried making both selectors *static public inner* classes, doesn't
>>>compile either.
>>>- Extracted both timeout and flat selectors to their own *independent
>>>classes* in separate files. Doesn't compile.
>>>- I am putting the *error stack* below.
>>>- Without the timeout selector in any class or lambda shape, with
>>>empty or full body, flatSelect compiles well.
>>>
>>> Would these findings help? Any ideas?
>>>
>>> Here is an error stack:
>>>
>>> 09:36:51,925 ERROR
>>> com.motaword.ipm.kernel.error.controller.ExceptionHandler -
>>> org.apache.flink.api.common.InvalidProgramException: The implementation
>>> of the PatternFlatSelectAdapter is not serializable. The object probably
>>> contains or references non serializable fields.
>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
>>> at
>>> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
>>> at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
>>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
>>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
>>> at
>>> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
>>> at
>>> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
>>> at
>>> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
>>> at com.motaword.ipm.kernel.Application.main(Application.java:63)
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at 

Re: How to pass application name when using FlinkKinesisConsumer

2019-04-23 Thread Xixi Li
Hey Liu,

Thank you for your response, by saying applicationName, i meant the name to
control table in Amazon DynamoDB while creating a consumer. Usually in the
non-flink env, we are able to control it by passing the applicationName in
the configuration properties when creating a Worker using kinesis client
library. But we didn't find such option when using FlinkKinesisConsumer.

Regards,
Xixi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: status on FLINK-7129

2019-04-23 Thread Hao Sun
+1

On Tue, Apr 23, 2019, 05:18 Vishal Santoshi 
wrote:

> +1
>
> On Tue, Apr 23, 2019, 4:57 AM kant kodali  wrote:
>
>> Thanks all for the reply. I believe this is one of the most important
>> feature that differentiates flink from other stream processing engines as
>> others don't even have CEP yet. so it would be great if this issue can get
>> more attention as I don't think anyone want's to restarts the Job every
>> time they want to detect a new pattern.
>>
>> On Mon, Apr 22, 2019 at 11:30 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Kant,
>>>
>>> I'm afraid Konstantin is right. Unfortunately AFAIK there is no active
>>> development on that issue.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 22/04/2019 18:20, Konstantin Knauf wrote:
>>>
>>> Hi Kant,
>>>
>>> as far as I know, no one is currently working on this. Dawid (cc) maybe
>>> knows more.
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> On Sat, Apr 20, 2019 at 12:12 PM kant kodali  wrote:
>>>
 Hi All,

 There seems to be a lot of interest for
 https://issues.apache.org/jira/browse/FLINK-7129
 

 Any rough idea on the status of this issue?

 Thanks!

>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>> Planned Absences: 17.04.2019 - 26.04.2019
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward
>>>  - The
>>> Apache Flink Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244
>>> B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>>>


Apache Flink survey by Ververica

2019-04-23 Thread Robert Metzger
Hi everyone!

Ververica is running a brief survey to understand Apache Flink usage
and the needs of the community. We are hoping that this survey will help
identify common usage patterns, as well as pinpoint what are the most
needed features for Flink.

We'll share a report with a summary of findings at the conclusion of the
survey with the community. All of the responses will remain confidential,
and only aggregate statistics will be shared.

I expect the survey to take about 10 minutes, and all questions are
optional--we appreciate any feedback that you're willing to provide.

As a thank you, respondents will be entering a draw to win a trip
to Flink Forward.

The survey is available here:
https://docs.google.com/forms/d/e/1FAIpQLSdbNS1O-l07aORzKFx6zr3OV13lOyCx79ZRgiC1jYGb57C_hg/viewform

Looking forward to hearing back from you!

Best,
Robert


Flink Customized read text file

2019-04-23 Thread Soheil Pourbafrani
Hi,

I want to know is it possible to use PipedInutStream and PipedOutputStream
in Flink for reading text data from a file?
For example extending a RichSourceFunction for it and readata like this:

DataStream raw = env.addSource(new PipedSource(file_path));

Actually i tried to implement a class for it but as PipedInputStream and
PipedOutputStream should be on seperate Threads, I have no idea how to
implement that.

Here is my template class

public static class PipedFile extends RichSourceFunction {
PipedOutputStream outputPipe = new PipedOutputStream();
PipedInputStream inputPipe = new PipedInputStream();
FileInputStream fis;


PipedFile(String s) throws IOException {
outputPipe.connect(inputPipe);
fis = new FileInputStream("data_source.csv");
}
@Override
public void run(SourceContext sourceContext) throws Exception {
int length;
byte[] buffer = new byte[1024];
while ((length = fis.read(buffer, 0, 1024)) != -1) {
outputPipe.write(buffer, 0, length);
}
}

@Override
public void cancel() {
try {
outputPipe.close();
inputPipe.close();
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}


json数据sink到parquet文件

2019-04-23 Thread icy
请教大家一个问题,我这边有个数据源是json格式的,里面的字段不确定,有什么方式可以sink到parquet文件吗,谢谢

发自我的 iPhone

Re: status on FLINK-7129

2019-04-23 Thread Vishal Santoshi
+1

On Tue, Apr 23, 2019, 4:57 AM kant kodali  wrote:

> Thanks all for the reply. I believe this is one of the most important
> feature that differentiates flink from other stream processing engines as
> others don't even have CEP yet. so it would be great if this issue can get
> more attention as I don't think anyone want's to restarts the Job every
> time they want to detect a new pattern.
>
> On Mon, Apr 22, 2019 at 11:30 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Kant,
>>
>> I'm afraid Konstantin is right. Unfortunately AFAIK there is no active
>> development on that issue.
>>
>> Best,
>>
>> Dawid
>> On 22/04/2019 18:20, Konstantin Knauf wrote:
>>
>> Hi Kant,
>>
>> as far as I know, no one is currently working on this. Dawid (cc) maybe
>> knows more.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Sat, Apr 20, 2019 at 12:12 PM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> There seems to be a lot of interest for
>>> https://issues.apache.org/jira/browse/FLINK-7129
>>>
>>> Any rough idea on the status of this issue?
>>>
>>> Thanks!
>>>
>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>> Planned Absences: 17.04.2019 - 26.04.2019
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B 
>> Managing
>> Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>>


Re: Missing state in RocksDB checkpoints

2019-04-23 Thread Ning Shi
Congxian,

Thank you for creating the ticket and providing the relevant code. I’m curious 
why you don’t think the directory collision is not a problem. What we observe 
is that one of the operator states are not included in the checkpoint and data 
is lost on restore. That’s a pretty serious problem especially when Flink 
doesn’t generate any error in the log. People could be losing states silently 
potentially.

Please let me know how I can best help diagnose this issue and drive the ticket 
forward. I’m happy to collect any relevant information.

Thanks,

—
Ning

> On Apr 23, 2019, at 2:54 AM, Congxian Qiu  wrote:
> 
> From the log message you given, the two operate share the same directory, and 
> when snapshot, the directory will be deleted first if it 
> exists(RocksIncrementalSnapshotStrategy#prepareLocalSnapshotDirectory).
> 
> I did not find an issue for this problem, and I don’t thinks this is a 
> problem of UUID generation problem, please check the path generation logic in 
> LocalRecoveryDirectoryProviderImpl#subtaskSpecificCheckpointDirectory.
> 
> I’ve created an issue for this problem.


Re: status on FLINK-7129

2019-04-23 Thread kant kodali
Thanks all for the reply. I believe this is one of the most important
feature that differentiates flink from other stream processing engines as
others don't even have CEP yet. so it would be great if this issue can get
more attention as I don't think anyone want's to restarts the Job every
time they want to detect a new pattern.

On Mon, Apr 22, 2019 at 11:30 PM Dawid Wysakowicz 
wrote:

> Hi Kant,
>
> I'm afraid Konstantin is right. Unfortunately AFAIK there is no active
> development on that issue.
>
> Best,
>
> Dawid
> On 22/04/2019 18:20, Konstantin Knauf wrote:
>
> Hi Kant,
>
> as far as I know, no one is currently working on this. Dawid (cc) maybe
> knows more.
>
> Cheers,
>
> Konstantin
>
> On Sat, Apr 20, 2019 at 12:12 PM kant kodali  wrote:
>
>> Hi All,
>>
>> There seems to be a lot of interest for
>> https://issues.apache.org/jira/browse/FLINK-7129
>>
>> Any rough idea on the status of this issue?
>>
>> Thanks!
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> Planned Absences: 17.04.2019 - 26.04.2019
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B 
> Managing
> Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-23 Thread Till Rohrmann
I think we should not expose the ClusterClient configuration via the
ExecutionEnvironment (env.getClusterClient().addJobListener) because this
is effectively the same as exposing the JobListener interface directly on
the ExecutionEnvironment. Instead I think it could be possible to provide a
ClusterClient factory which is picked up from the Configuration or some
other mechanism for example. That way it would not need to be exposed via
the ExecutionEnvironment at all.

Cheers,
Till

On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:

> >>>  The ExecutionEnvironment is usually used by the user who writes the
> code and this person (I assume) would not be really interested in these
> callbacks.
>
> Usually ExecutionEnvironment is used by the user who write the code, but
> it doesn't needs to be created and configured by this person. e.g. in
> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
> use ExecutionEnvironment to write flink program.  You are right that the
> end user would not be interested in these callback, but the third party
> library that integrate with zeppelin would be interested in these callbacks.
>
> >>> In your case, it could be sufficient to offer some hooks for the
> ClusterClient or being able to provide a custom ClusterClient.
>
> Actually in my initial PR (https://github.com/apache/flink/pull/8190), I
> do pass JobListener to ClusterClient and invoke it there.
> But IMHO, ClusterClient is not supposed be a public api for users. Instead
> JobClient is the public api that user should use to control job. So adding
> hooks to ClusterClient directly and provide a custom ClusterClient doesn't
> make sense to me. IIUC, you are suggesting the following approach
>  env.getClusterClient().addJobListener(jobListener)
> but I don't see its benefit compared to this.
>  env.addJobListener(jobListener)
>
> Overall, I think adding hooks is orthogonal with fine grained job
> control. And I agree that we should refactor the flink client component,
> but I don't think it would affect the JobListener interface. What do you
> think ?
>
>
>
>
> Till Rohrmann  于2019年4月18日周四 下午8:57写道:
>
>> Thanks for starting this discussion Jeff. I can see the need for
>> additional hooks for third party integrations.
>>
>> The thing I'm wondering is whether we really need/want to expose a
>> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
>> usually used by the user who writes the code and this person (I assume)
>> would not be really interested in these callbacks. If he would, then one
>> should rather think about a better programmatic job control where the
>> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
>> Moreover, we would effectively make this part of the public API and every
>> implementation would need to offer it.
>>
>> In your case, it could be sufficient to offer some hooks for the
>> ClusterClient or being able to provide a custom ClusterClient. The
>> ClusterClient is the component responsible for the job submission and
>> retrieval of the job result and, hence, would be able to signal when a job
>> has been submitted or completed.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>>
>>> Hi Jeff,
>>>
>>> I personally like this proposal. From the perspective of
>>> programmability, the JobListener can make the third program more
>>> appreciable.
>>>
>>> The scene where I need the listener is the Flink cube engine for Apache
>>> Kylin. In the case, the Flink job program is embedded into the Kylin's
>>> executable context.
>>>
>>> If we could have this listener, it would be easier to integrate with
>>> Kylin.
>>>
>>> Best,
>>> Vino
>>>
>>> Jeff Zhang  于2019年4月18日周四 下午1:30写道:
>>>

 Hi All,

 I created FLINK-12214
  for adding
 JobListener (hook) in flink job lifecycle. Since this is a new public api
 for flink, so I'd like to discuss it more widely in community to get more
 feedback.

 The background and motivation is that I am integrating flink into apache
 zeppelin (which is a notebook in case you
 don't know). And I'd like to capture some job context (like jobId) in the
 lifecycle of flink job (submission, executed, cancelled) so that I can
 manipulate job in more fined grained control (e.g. I can capture the jobId
 when job is submitted, and then associate it with one paragraph, and when
 user click the cancel button, I can call the flink cancel api to cancel
 this job)

 I believe other projects which integrate flink would need similar
 mechanism. I plan to add api addJobListener in
 ExecutionEnvironment/StreamExecutionEnvironment so that user can add
 customized hook in flink job lifecycle.

 Here's draft interface JobListener.

 public interface JobListener {

 void onJobSubmitted(JobID jobId);

 void 

HA lock nodes, Checkpoints, and JobGraphs not being removed after failure

2019-04-23 Thread dyana . rose
originally posted to the dev group, but it's a bit easy for things to get 
buried a bit there, and this may concern other HA users.

Flink v1.7.1

After a Flink reboot we've been seeing some unexpected issues with excess 
retained checkpoints not being able to be removed from ZooKeeper after a new 
checkpoint is created.

I believe I've got my head around the role of ZK and lockNodes in Checkpointing 
after going through the code. Could you check my logic on this and add any 
insight, especially if I've got it wrong?

The situation:
1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA with 
S3 as the backing store.

2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has its 
own lockNode UUID. JM1 is elected leader.

3) We submit a job, that JobGraph lockNode is added to ZK using JM1's JobGraph 
lockNode.

4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's 
checkpoint lockNode. We continue running, and checkpoints are successfully 
being created and excess checkpoints removed.

5) Both JM1 and JM2 now are rebooted.

6) The JobGraph is recovered by the leader, the job restarts from the latest 
checkpoint.

Now after every new checkpoint we see in the ZooKeeper logs:
INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got user-level 
KeeperException when processing sessionid:0x1047715000d type:delete 
cxid:0x210 zxid:0x71091 txntype:-1 reqpath:n/a Error 
Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0057813
 Error:KeeperErrorCode = Directory not empty for 
/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/005781
with an increasing checkpoint id on each subsequent call.

When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled, right? As 
the old checkpoints were created under the old UUID, the new JMs will never be 
able to remove the old retained checkpoints from ZooKeeper.

Is that correct?

If so, would this also happen with JobGraphs in the following situation (we saw 
this just recently where we had a JobGraph for a cancelled job still in ZK):

Steps 1 through 3 above, then:
4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1 restarts.

5) some time later while JM2 is still leader we hard cancel the job and restart 
the JMs

In this case JM2 would successfully remove the job from s3, but because its 
lockNode is different from JM1 it cannot delete the lock file in the jobgraph 
folder and so can’t remove the jobgraph. Then Flink restarts and tries to 
process the JobGraph it has found, but the S3 files have been deleted.

Possible related closed issues (fixes went in v1.7.0): 
https://issues.apache.org/jira/browse/FLINK-10184 and 
https://issues.apache.org/jira/browse/FLINK-10255

Thanks for any insight,
Dyana


??????RE: flink??????????????

2019-04-23 Thread 1900
??


1.??4??sink??collect??sink??
readfromkafka??collect??
2.??sinkdb??list,??
??keyselectertimewindow??keyby1
3.??510??8??2??2
??keyselecter,??10








IODB??sink??
 
??keyselecter??flink??keyhashOperator??
 Sent from Mail for Windows 10 
 From: 1900 <57...@qq.com> Sent: Tuesday, April 
23, 2019 11:27:47 AM To: user-zh Subject: ??RE: flink??


--  --
??: "Shi Quan";
: 2019??4??23??(??) 10:02
??: "user-zh@flink.apache.org";

: RE: flink??









??

  1.  
operator??chain
  2.  ??kafka??you are 
rightsource??
  3.  kafka source
  4.  & 5 ??





Sent from Mail for Windows 10




From: 1900 <575209...@qq.com>
Sent: Tuesday, April 23, 2019 9:47:08 AM
To: user-zh
Subject: flink??

flink1.7.2??hadoop??2.8.5,flink on yarn 
ha?? run a job on yarn


??kafka??window??5??list??db


1.??
2.kafkatopictopic??8??8
 
3.??kafka??window
kafka??4kafka??kafka??1
??kafka
4.keyID??hash??
??kafka
??DB??kafka??
5.keyslot8??6
??2

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

2019-04-23 Thread Dawid Wysakowicz
Hi Oytun,

I think there is a regression introduced in 1.8 how we handle output
tags. The problem is we do not call ClosureCleaner on OutputTag.

There are two options how you can workaround this issue:

1. Declare the OutputTag static

2. Clean the closure explicitly as Guowei suggested:
StreamExecutionEnvironment.clean(pendingProjectsTag)

I also opened a jira issue to fix this (FLINK-12297[1])

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-12297

On 22/04/2019 03:06, Guowei Ma wrote:
> I think you could try
> StreamExecutionEnvironment.clean(pendingProjectsTag). 
>
>
> Oytun Tez mailto:oy...@motaword.com>>于2019年4月19日
> 周五下午9:58写道:
>
> Forgot to answer one of your points: the parent class compiles
> well without this CEP selector (with timeout signature)...
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com  — www.motaword.com
> 
>
>
> On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez  > wrote:
>
> Hey JingsongLee!
>
> Here are some findings...
>
>   * flatSelect *without timeout* works normally:
> patternStream.flatSelect(PatternFlatSelectFunction), this
> compiles well.
>   * Converted the both timeout and select selectors to an
> *inner class* (not static), yielded the same results,
> doesn't compile.
>   * flatSelect *without* timeout, but with an inner class for
> PatternFlatSelectFunction, it compiles (same as first bullet).
>   * Tried both of these selectors with _empty_ body. Just a
> skeleton class. Doesn't compile either. Empty body example
> is in my first email.
>   * Tried making both selectors *static public inner* classes,
> doesn't compile either.
>   * Extracted both timeout and flat selectors to their own
> *independent classes* in separate files. Doesn't compile.
>   * I am putting the *error stack* below.
>   * Without the timeout selector in any class or lambda shape,
> with empty or full body, flatSelect compiles well.
>
> Would these findings help? Any ideas?
>
> Here is an error stack:
>
> 09:36:51,925 ERROR
> com.motaword.ipm.kernel.error.controller.ExceptionHandler     - 
> org.apache.flink.api.common.InvalidProgramException: The
> implementation of the PatternFlatSelectAdapter is not
> serializable. The object probably contains or references non
> serializable fields.
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at
> 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
> at
> 
> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
> at
> org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
> at
> org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
> at
> org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
> at
> 
> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
> at
> 
> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
> at
> 
> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
> at com.motaword.ipm.kernel.Application.main(Application.java:63)
> Caused by: java.io.NotSerializableException:
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
> 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> 
> 

Re: Missing state in RocksDB checkpoints

2019-04-23 Thread Congxian Qiu
Hi, Ning

From the log message you given, the two operate share the same directory, and 
when snapshot, the directory will be deleted first if it 
exists(RocksIncrementalSnapshotStrategy#prepareLocalSnapshotDirectory).

I did not find an issue for this problem, and I don’t thinks this is a problem 
of UUID generation problem, please check the path generation logic in 
LocalRecoveryDirectoryProviderImpl#subtaskSpecificCheckpointDirectory.

I’ve created an issue for this problem.

Best, Congxian
On Apr 23, 2019, 11:19 +0800, Ning Shi , wrote:
> We have a Flink job using RocksDB state backend. We found that one of the
> RichMapFunction state was not being saved in checkpoints or savepoints. After
> some digging, it seems that two operators in the same operator chain are
> colliding with each other during checkpoint or savepoint, resulting in one of
> the operator's state to be missing.
>
> I extracted all the checkpoint directory for all operators from the RocksDB 
> LOG
> files for one of the checkpoints. As you can see, the StreamMap operator 
> shared
> the same checkpoint directory with the CoBroadcastWithKeyedOperator. They are 
> in
> the same operator chain.
>
> /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_1/chk_21244/rocks_db
>  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__2_90__
> /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_53/chk_21244/rocks_db
>  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__54_90__
> /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_53/chk_21244/rocks_db
>  StreamMap_3c5866a6cc097b462de842b2ef91910d__54_90__
> /var/flink/data/localState/aid_AllocationID{37a99d74a8e452ff06257c61ab13a3c8}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_77/chk_21244/rocks_db
>  WindowOperator_bc2936094388a70852534bd6c0fce178__78_90__
> /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_84/chk_21244/rocks_db
>  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__85_90__
> /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_66/chk_21244/rocks_db
>  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__67_90__
> /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_66/chk_21244/rocks_db
>  StreamMap_3c5866a6cc097b462de842b2ef91910d__67_90__
> /var/flink/data/localState/aid_AllocationID{5cde66b8a81c5202f7685928bb18ac00}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_15/chk_21244/rocks_db
>  WindowOperator_bc2936094388a70852534bd6c0fce178__16_90__
> /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_53/chk_21244/rocks_db
>  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__54_90__
> /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_1/chk_21244/rocks_db
>  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__2_90__
> /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_1/chk_21244/rocks_db
>  StreamMap_3c5866a6cc097b462de842b2ef91910d__2_90__
> /var/flink/data/localState/aid_AllocationID{61cf4a285199ab779ec85784980d15e2}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_56/chk_21244/rocks_db
>  WindowOperator_bc2936094388a70852534bd6c0fce178__57_90__
> /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_54b7f0cbe350c274d942032aa504dbdd_sti_30/chk_21244/rocks_db
>  CoStreamFlatMap_54b7f0cbe350c274d942032aa504dbdd__31_90__
> /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_46/chk_21244/rocks_db
>  CoBroadcastWithKeyedOperator_567adb020dcc57a12c17bd43c00b0f55__47_90__
> /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_567adb020dcc57a12c17bd43c00b0f55_sti_46/chk_21244/rocks_db
>  StreamMap_3c5866a6cc097b462de842b2ef91910d__47_90__
> /var/flink/data/localState/aid_AllocationID{a93df5bf51a4f9b3673cd18b46abbecb}/jid_6241b30b0adb82bd50cd5d37aa6128d1/vtx_bc2936094388a70852534bd6c0fce178_sti_12/chk_21244/rocks_db
> 

Re: status on FLINK-7129

2019-04-23 Thread Dawid Wysakowicz
Hi Kant,

I'm afraid Konstantin is right. Unfortunately AFAIK there is no active
development on that issue.

Best,

Dawid

On 22/04/2019 18:20, Konstantin Knauf wrote:
> Hi Kant,
>
> as far as I know, no one is currently working on this. Dawid (cc)
> maybe knows more.
>
> Cheers,
>
> Konstantin
>
> On Sat, Apr 20, 2019 at 12:12 PM kant kodali  > wrote:
>
> Hi All,
>
> There seems to be a lot of interest
> for https://issues.apache.org/jira/browse/FLINK-7129
>
> Any rough idea on the status of this issue?
>
> Thanks!
>
>
>
> -- 
>
> Konstantin Knauf| Solutions Architect
>
> +49 160 91394525
>
> Planned Absences: 17.04.2019 - 26.04.2019
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward - The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Data Artisans GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
> BManaging Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen   


signature.asc
Description: OpenPGP digital signature