Re: Expressing Flink array aggregation using Table / SQL API

2019-03-12 Thread Kurt Young
Hi Piyush,

I think your second sql is correct, but the problem you have encountered is
the outside aggregation (GROUP BY userId
& COLLECT(client_custom_aggregated)) will
emit result immediately when receiving results from the inner aggregation.
Hence Flink need the sink to
1. either has ability to retract the former emitted result, the sink should
be a `RetractStreamTableSink` or
2. the sink has something like primary key and can update result by key. In
your case, userId should be the key.

I think you are trying to emit the result to a `AppendStreamTableSink`, so
here is why you see error like that.

Best,
Kurt


On Tue, Mar 12, 2019 at 9:46 PM Piyush Narang  wrote:

> Thanks for getting back Kurt. Yeah this might be an option to try out. I
> was hoping there would be a way to express this directly in the SQL though
> ☹.
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Kurt Young 
> *Date: *Tuesday, March 12, 2019 at 2:25 AM
> *To: *Piyush Narang 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Expressing Flink array aggregation using Table / SQL API
>
>
>
> Hi Piyush,
>
>
>
> Could you try to add clientId into your aggregate function, and to track
> the map of  inside your new aggregate
> function, and assemble what ever result when emit.
>
> The SQL will looks like:
>
>
>
> SELECT userId, some_aggregation(clientId, eventType, `timestamp`,
> dataField)
>
> FROM my_kafka_stream_table
>
> GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
>
>
>
> Kurt
>
>
>
>
>
> On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang 
> wrote:
>
> Hi folks,
>
>
>
> I’m getting started with Flink and trying to figure out how to express
> aggregating some rows into an array to finally sink data into an
> AppendStreamTableSink.
>
> My data looks something like this:
>
> userId, clientId, eventType, timestamp, dataField
>
>
>
> I need to compute some custom aggregations using a UDAF while grouping by
> userId, clientId over a sliding window (10 mins, triggered every 1 min). My
> first attempt is:
>
> SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField)
> as custom_aggregated
>
> FROM my_kafka_stream_table
>
> GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL
> '1' HOUR)
>
>
>
> This query works as I expect it to. In every time window I end up with
> inserts for unique userId + clientId combinations. What I want to do
> though, is generate a single row per userId in each time window and this is
> what I’m struggling with expressing along with the restriction that I want
> to sink this to an AppendStreamTableSink. I was hoping to do something like
> this:
>
>
>
> SELECT userId, COLLECT(client_custom_aggregated)
>
> FROM
>
> (
>
>   SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`,
> dataField) as custom_aggregated] as client_custom_aggregated
>
>   FROM my_kafka_stream_table
>
>   GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE,
> INTERVAL '1' HOUR)
>
> ) GROUP BY userId
>
>
>
> Unfortunately when I try this (and a few other variants), I run into the
> error, “AppendStreamTableSink requires that Table has only insert changes”.
> Does anyone know if there’s a way for me to compute my collect aggregation
> to produce one row per userId for a given time window?
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>


Re: Task slot sharing: force reallocation

2019-03-12 Thread Le Xu
Thanks Till.

I switched to Flink 1.7.1 and it seems to solve part of my problem (the
downstream operator does not seem to sit on the same machine anymore). But
the new problem is that does Flink implicitly set all sub tasks of data
sources generated by RichParallelFunction to be inside the same
slot-sharing group? The older version I had seemed to evenly distributed
all subtasks all over the cluster so that each machine takes the load
evenly, but the new version seems to prefer to allocate all tasks into the
same machine (e.g. when I'm running one job with source with parallelism of
4, it takes up 4 slots on machine 1 and leave machine 2 empty instead of
distributing these tasks evenly).

I am aware of the slot sharing splitting strategy described in this link
.
But I believe this only applies to separate streams from different source.
Does Flink also support slot splitting among subtasks inside one stream as
well?

Thanks,

Le

On Thu, Mar 7, 2019 at 3:37 AM Till Rohrmann  wrote:

> The community no longer actively supports Flink < 1.6. Therefore I would
> try out whether you can upgrade to one of the latest versions. However, be
> aware that we reworked Flink's distributed architecture which slightly
> affected the scheduling behavior. In your case, it should actually be
> beneficial because it will do what you are looking for.
>
> Cheers,
> Till
>
> On Wed, Mar 6, 2019 at 8:13 PM Le Xu  wrote:
>
>> 1.3.2 -- should I update to the latest version?
>>
>> Thanks,
>>
>> Le
>>
>> On Wed, Mar 6, 2019 at 4:24 AM Till Rohrmann 
>> wrote:
>>
>>> Which version of Flink are you using?
>>>
>>> On Tue, Mar 5, 2019 at 10:58 PM Le Xu  wrote:
>>>
 Hi Till:

 Thanks for the reply. The setup of the jobs is roughly as follows: For
 a cluster with N machines, we deploy X simple map/reduce style jobs (the
 job DAG and settings are exactly the same, except they consumes different
 data). Each job has N mappers (they are evenly distributed, one mapper on
 each machine).There are X mappers on each machine (as there are X jobs in
 total). Each job has only one reducer where all mappers point to. What I'm
 observing is that all reducers are allocated to machine 1 (where all mapper
 1 from every job is allocated to).  It does make sense since reducer and
 mapper 1 are in the same slot group. The original purpose of the questions
 is to find out whether it is possible to explicitly specify that reducer
 can be co-located with another mapper (such as mapper 2 so the reducer of
 job 2 can be placed on machine 2). Just trying to figure out if it is all
 possible without using more expensive approach (through YARN for example).
 But if it is not possible I will see if I can move to job mode as Piotr
 suggests.

 Thanks,

 Le

 On Tue, Mar 5, 2019 at 9:24 AM Till Rohrmann 
 wrote:

> Hard to tell whether this is related to FLINK-11815.
>
> To me the setup is not fully clear. Let me try to sum it up: According
> to Le Xu's description there are n jobs running on a session cluster. I
> assume that every TaskManager has n slots. The observed behaviour is that
> every job allocates the slot for the first mapper and chained sink from 
> the
> first TM, right? Since Flink does not give strict guarantees for the slot
> allocation this is possible, however it should be highly unlikely or at
> least change when re-executing the same setup. At the moment there is no
> functionality in place to control the task-slot assignment.
>
> Chaining only affects which task will be grouped together and executed
> by the same Task (being executed by the same thread). Separate tasks can
> still be executed in the same slot if they have the same slot sharing
> group. This means that there can be multiple threads running in each slot.
>
> For me it would be helpful to get more information about the actual
> job deployments.
>
> Cheers,
> Till
>
> On Tue, Mar 5, 2019 at 12:00 PM Piotr Nowojski 
> wrote:
>
>> Hi Le,
>>
>> As I wrote, you can try running Flink in job mode, which spawns
>> separate clusters per each job.
>>
>> Till, is this issue covered by FLINK-11815
>>  ? Is this the
>> same as:
>>
>> > Known issues:
>> > 1. (…)
>> > 2. if task slots are registered before slot request, the code have
>> a tendency to group requests together on the same machine because we
>> are using a LinkedHashMap
>>
>> ?
>>
>> Piotrek
>>
>> On 4 Mar 2019, at 21:08, Le Xu  wrote:
>>
>> Thanks Piotr.
>>
>> I didn't realize that the email attachment isn't working so the
>> example I was referring to was this figure from Flink 

Re: flink 1.7.2集群异常退出

2019-03-12 Thread Yun Tang
Hi

你是不是没有配置checkpoint 
path,且没有显式的配置FsStateBackend或者RocksDBStateBackend,这应该是一个MemoryStateBackend 
在配置HA却没有配置checkpoint path时候的bug,参见我之前创建的JIRA 
https://issues.apache.org/jira/browse/FLINK-11107

相关PR已经提交了,不过社区认为MemoryStateBackend更多的是debug用 或者 
实验性质的toy,不会有生产环境直接使用,加之最近忙于release-1.8的发布,所以暂时还没有review代码。

祝好
唐云

From: ppp Yun 
Sent: Wednesday, March 13, 2019 10:24
To: user-zh
Subject: flink 1.7.2集群异常退出

Hi,ALL

 写了个测试程序,大概跑了不到三个小时,flink集群就挂了,所有节点退出,报错如下:

2019-03-12 20:45:14,623 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Tbox from 
Kafka Sink To Kafka And Print (21949294d4750b869b341c5d2942d499) switched from 
state RUNNING to FAILING.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException):
 The directory item limit of /tmp/ha is exceeded: limit=1048576 items=1048576


hdfs count结果:

20971514  124334563 hdfs://banma/tmp/ha


下面是flink-conf.yaml的配置:

[hdfs@qa-hdpdn06 flink-1.7.2]$ cat conf/flink-conf.yaml |grep ^[^#]
jobmanager.rpc.address: 10.4.11.252
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 10
parallelism.default: 1
 high-availability: zookeeper
 high-availability.storageDir: hdfs://banma/tmp/ha
 high-availability.zookeeper.quorum: qa-hdpdn05.ebanma.com:2181
rest.port: 8081

flink版本:官方最新的flink 1,7.2

为什么 high-availability.storageDir的目录会产生如此多的子目录?里面存的都是什么?什么情况下回触发这些存储操作?如何避免这个问题?

谢谢!


Re: Conflicting Cassandra versions while using flink-connector-cassandra

2019-03-12 Thread Gustavo Momenté
Can I shade `flink-connector-cassandra` version? And if so do you know why
it isn't shaded by default?

Em ter, 12 de mar de 2019 às 23:00, Congxian Qiu 
escreveu:

> Hi Gustavo Momenté
> If you want the both driver versions coexist, maybe you could try maven
> shade plugin[1]
>
> [1] https://maven.apache.org/plugins/maven-shade-plugin/
>
> Best, Congxian
> On Mar 13, 2019, 02:22 +0800, Gustavo Momenté ,
> wrote:
>
> I'm having trouble using CassandraSink while also
> using datastax's cassandra-driver. While digging trough
> the flink-connector-cassandra.jar I realized that it bundles 
> *cassandra-driver-core
> 3.0.0* while internally we use version *3.1.4* to read data from
> Cassandra.
>
> I couldn't find the reason why the connector comes bundled with the driver
> classes and I don't know what would be the best course of action to make
> both driver versions coexist.
>
> Best regards,
> Gustavo Momenté
>
>


Re: Conflicting Cassandra versions while using flink-connector-cassandra

2019-03-12 Thread Congxian Qiu
Hi Gustavo Momenté
If you want the both driver versions coexist, maybe you could try maven shade 
plugin[1]

[1] https://maven.apache.org/plugins/maven-shade-plugin/

Best, Congxian
On Mar 13, 2019, 02:22 +0800, Gustavo Momenté , 
wrote:
> I'm having trouble using CassandraSink while also using datastax's 
> cassandra-driver. While digging trough  the flink-connector-cassandra.jar I 
> realized that it bundles cassandra-driver-core 3.0.0 while internally we use 
> version 3.1.4 to read data from Cassandra.
>
> I couldn't find the reason why the connector comes bundled with the driver 
> classes and I don't know what would be the best course of action to make both 
> driver versions coexist.
>
> Best regards,
> Gustavo Momenté


Conflicting Cassandra versions while using flink-connector-cassandra

2019-03-12 Thread Gustavo Momenté
I'm having trouble using CassandraSink while also
using datastax's cassandra-driver. While digging trough
the flink-connector-cassandra.jar I realized that it bundles
*cassandra-driver-core
3.0.0* while internally we use version *3.1.4* to read data from Cassandra.

I couldn't find the reason why the connector comes bundled with the driver
classes and I don't know what would be the best course of action to make
both driver versions coexist.

Best regards,
Gustavo Momenté


Re: How to join stream and dimension data in Flink?

2019-03-12 Thread Hequn Cheng
Hi Henry,

Yes, you are correct. Basically, there are two ways you can use to join a
Temporal Table. One is provided in Flink and the other is provided in Blink
which has been pushed as a branch[1] in Flink repo.

- Join a Temporal Table in Flink[2][3][4]
As the document said: it is a join with a temporal table joins an
append-only table (left input/probe side) with a temporal table (right
input/build side), i.e., a table that changes over time and tracks its
changes. You need to define a temporal table function and it will be used
to provide access to the state of a temporal table at a specific point in
time. *Both rowtime and proctime are supported.*
- Join a Temporal Table in Blink[5]
Different from the join in Flink, it can join an *append/upsert/retract*
stream (left input/probe side) with a temporal table (right input/build
side), i.e., a *remote dimension table* that changes over time. In order to
access data in a temporal table, you need to define a TableSource with
LookupableTableSource[6](Probably you can download the code of blink and
take a look at the `HBase143TableSource` which is an implementation of
LookupableTableSource). Currently, only proctime is supported.

I think you can choose one according to your scenarios.
There are some useful examples in the document I list below. They may be
very helpful for you. Feel free to ask if you have any other questions.

Best,
Hequn

[1] https://github.com/apache/flink/tree/blink
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

[3]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
[4]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
[5]
https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table
[6]
https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable

On Tue, Mar 12, 2019 at 2:13 PM 徐涛  wrote:

> Hi Hequn,
> I want to implement stream join dimension in Flink SQL, I found there is a
> new feature named Temporal Tables delivered by Flink1.7, I think it maybe
> could be used to achieve the join between stream and dimension table. But I
> am not sure about that. Could anyone help me about it?
> Thanks a lot for your help.
>
> Best
> Henry
>
> 在 2018年9月26日,上午12:16,Hequn Cheng  写道:
>
> Hi vino,
>
> Thanks for sharing the link. It's a great book and I will take a look.
> There are kinds of join. Different joins have different semantics. From
> the link, I think it means the time versioned join.  FLINK-9712
>  enrichments joins with
> Time Versioned Functions and the result is deterministic under eventime.
>
> Best, Hequn
>
> On Tue, Sep 25, 2018 at 11:05 PM vino yang  wrote:
>
>> Hi Hequn,
>>
>> The specific content of the book does not give a right or wrong
>> conclusion, but it illustrates this phenomenon: two streams of the same
>> input, playing and joining at the same time, due to the order of events,
>> the connection results are uncertain. This is because the two streams are
>> intertwined in different forms. This has nothing to do with orderby, just
>> that it exists in the stream stream join. Of course, this phenomenon is
>> only a comparison statement with a non-stream join.
>>
>> In addition, I recommend this book, which is very famous on Twitter and
>> Amazon. Because you are also Chinese, there is a good translation here. If
>> I guess it is correct, the main translator is also from your company. This
>> part of what I mentioned is here.[1]
>>
>> [1]:
>> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>>
>> Thanks, vino.
>>
>> Hequn Cheng  于2018年9月25日周二 下午9:45写道:
>>
>>> Hi vino,
>>>
>>> There are no order problems of stream-stream join in Flink. No matter
>>> what order the elements come, stream-stream join in Flink will output
>>> results which consistent with standard SQL semantics. I haven't read the
>>> book you mentioned. For join, it doesn't guarantee output orders. You have
>>> to do orderBy if you want to get ordered results.
>>>
>>> Best, Hequn
>>>
>>> On Tue, Sep 25, 2018 at 8:36 PM vino yang  wrote:
>>>
 Hi Fabian,

 I may not have stated it here, and there is no semantic problem at the
 Flink implementation level. Rather, there may be “Time-dependence” here. 
 [1]

 Yes, my initial answer was not to use this form of join in this
 scenario, but Henry said he converted the table into a stream table and
 asked about the feasibility of other methods.

 [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part
 3: Derived Data, Chapter 11: Stream Processing , Stream Joins.

 some content :

 *If the ordering of events across streams is undetermined, the join
 becomes nondeter‐ ministic [87], which means you cannot 

Re: Set partition number of Flink DataSet

2019-03-12 Thread Ken Krugler
Hi Qi,

If I understand what you’re trying to do, then this sounds like a variation of 
a bucketing sink.

That typically uses a field value to create a directory path or a file name 
(though the filename case is only viable when the field is also what’s used to 
partition the data)

But I don’t believe Flink has built-in support for that, in batch mode (see 
BucketingSink 

 for streaming).

Maybe Blink has added that? Hoping someone who knows that codebase can chime in 
here.

Otherwise you’ll need to create a custom sink to implement the desired behavior 
- though abusing a MapPartitionFunction 

 would be easiest, I think.

— Ken



> On Mar 12, 2019, at 2:28 AM, qi luo  wrote:
> 
> Hi Ken,
> 
> Thanks for your reply. I may not make myself clear: our problem is not about 
> reading but rather writing. 
> 
> We need to write to N files based on key partitioning. We have to use 
> setParallelism() to set the output partition/file number, but when the 
> partition number is too large (~100K), the parallelism would be too high. Is 
> there any other way to achieve this?
> 
> Thanks,
> Qi
> 
>> On Mar 11, 2019, at 11:22 PM, Ken Krugler > > wrote:
>> 
>> Hi Qi,
>> 
>> I’m guessing you’re calling createInput() for each input file.
>> 
>> If so, then instead you want to do something like:
>> 
>>  Job job = Job.getInstance();
>> 
>>  for each file…
>>  FileInputFormat.addInputPath(job, new 
>> org.apache.hadoop.fs.Path(file path));
>> 
>>  env.createInput(HadoopInputs.createHadoopInput(…, job)
>> 
>> Flink/Hadoop will take care of parallelizing the reads from the files, given 
>> the parallelism that you’re specifying.
>> 
>> — Ken
>> 
>> 
>>> On Mar 11, 2019, at 5:42 AM, qi luo >> > wrote:
>>> 
>>> Hi,
>>> 
>>> We’re trying to distribute batch input data to (N) HDFS files partitioning 
>>> by hash using DataSet API. What I’m doing is like:
>>> 
>>> env.createInput(…)
>>>   .partitionByHash(0)
>>>   .setParallelism(N)
>>>   .output(…)
>>> 
>>> This works well for small number of files. But when we need to distribute 
>>> to large number of files (say 100K), the parallelism becomes too large and 
>>> we could not afford that many TMs.
>>> 
>>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
>>> parallelism separately (using dynamic allocation). Is there anything 
>>> similar in Flink or other way we can achieve similar result? Thank you!
>>> 
>>> Qi
>> 
>> --
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com 
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>> 
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vishal Santoshi
Awesome, thanks!

On Tue, Mar 12, 2019 at 11:53 AM Gary Yao  wrote:

> The RC artifacts are only deployed to the Maven Central Repository when
> the RC
> is promoted to a release. As written in the 1.8.0 RC1 voting email [1], you
> can find the maven artifacts, and the Flink binaries here:
>
> -
> https://repository.apache.org/content/repositories/orgapacheflink-1210/
> - https://dist.apache.org/repos/dist/dev/flink/flink-1.8.0-rc1/
>
> Alternatively, you can apply the patch yourself, and build Flink 1.7 from
> sources [2]. On my machine this takes around 10 minutes if tests are
> skipped.
>
> Best,
> Gary
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#build-flink
>
> On Tue, Mar 12, 2019 at 4:01 PM Vishal Santoshi 
> wrote:
>
>> Do you have a mvn repository ( at mvn central )  set up for 1,8 release
>> candidate. We could test it for you.
>>
>> Without 1.8and this exit code we are essentially held up.
>>
>> On Tue, Mar 12, 2019 at 10:56 AM Gary Yao  wrote:
>>
>>> Nobody can tell with 100% certainty. We want to give the RC some exposure
>>> first, and there is also a release process that is prescribed by the ASF
>>> [1].
>>> You can look at past releases to get a feeling for how long the release
>>> process lasts [2].
>>>
>>> [1] http://www.apache.org/legal/release-policy.html#release-approval
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=search_page=1=%5BVOTE%5D+Release=0
>>>
>>>
>>> On Tue, Mar 12, 2019 at 3:38 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 And when is the 1.8.0 release expected ?

 On Tue, Mar 12, 2019 at 10:32 AM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> :) That makes so much more sense. Is  k8s native flink a part of this
> release ?
>
> On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:
>
>> Hi Vishal,
>>
>> This issue was fixed recently [1], and the patch will be released
>> with 1.8. If
>> the Flink job gets cancelled, the JVM should exit with code 0. There
>> is a
>> release candidate [2], which you can test.
>>
>> Best,
>> Gary
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10743
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>>
>> On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thanks Vijay,
>>>
>>> This is the larger issue.  The cancellation routine is itself broken.
>>>
>>> On cancellation flink does remove the checkpoint counter
>>>
>>> *2019-03-12 14:12:13,143
>>> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  
>>> -
>>> Removing /checkpoint-counter/ from
>>> ZooKeeper *
>>>
>>> but exist with a non zero code
>>>
>>> *2019-03-12 14:12:13,477
>>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint 
>>> with
>>> exit code 1444.*
>>>
>>>
>>> That I think is an issue. A cancelled job is a complete job and thus
>>> the exit code should be 0 for k8s to mark it complete.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar <
>>> bhaskar.eba...@gmail.com> wrote:
>>>
 Yes Vishal. Thats correct.

 Regards
 Bhaskar

 On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> This really not cool but here you go. This seems to work. Agreed
> that this cannot be this painful. The cancel does not exit with an 
> exit
> code pf 0 and thus the job has to manually delete. Vijay does this 
> align
> with what you have had to do ?
>
>
>- Take a save point . This returns a request id
>
>curl  --header "Content-Type: application/json" --request POST 
> --data 
> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
> 
> https://*/jobs//savepoints
>
>
>
>- Make sure the save point succeeded
>
>curl  --request GET   
> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>
>
>
>- cancel the job
>
>curl  --request PATCH 
> https://***/jobs/?mode=cancel
>
>

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Gary Yao
The RC artifacts are only deployed to the Maven Central Repository when the
RC
is promoted to a release. As written in the 1.8.0 RC1 voting email [1], you
can find the maven artifacts, and the Flink binaries here:

-
https://repository.apache.org/content/repositories/orgapacheflink-1210/
- https://dist.apache.org/repos/dist/dev/flink/flink-1.8.0-rc1/

Alternatively, you can apply the patch yourself, and build Flink 1.7 from
sources [2]. On my machine this takes around 10 minutes if tests are
skipped.

Best,
Gary

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#build-flink

On Tue, Mar 12, 2019 at 4:01 PM Vishal Santoshi 
wrote:

> Do you have a mvn repository ( at mvn central )  set up for 1,8 release
> candidate. We could test it for you.
>
> Without 1.8and this exit code we are essentially held up.
>
> On Tue, Mar 12, 2019 at 10:56 AM Gary Yao  wrote:
>
>> Nobody can tell with 100% certainty. We want to give the RC some exposure
>> first, and there is also a release process that is prescribed by the ASF
>> [1].
>> You can look at past releases to get a feeling for how long the release
>> process lasts [2].
>>
>> [1] http://www.apache.org/legal/release-policy.html#release-approval
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=search_page=1=%5BVOTE%5D+Release=0
>>
>>
>> On Tue, Mar 12, 2019 at 3:38 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> And when is the 1.8.0 release expected ?
>>>
>>> On Tue, Mar 12, 2019 at 10:32 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 :) That makes so much more sense. Is  k8s native flink a part of this
 release ?

 On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:

> Hi Vishal,
>
> This issue was fixed recently [1], and the patch will be released with
> 1.8. If
> the Flink job gets cancelled, the JVM should exit with code 0. There
> is a
> release candidate [2], which you can test.
>
> Best,
> Gary
>
> [1] https://issues.apache.org/jira/browse/FLINK-10743
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>
> On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Thanks Vijay,
>>
>> This is the larger issue.  The cancellation routine is itself broken.
>>
>> On cancellation flink does remove the checkpoint counter
>>
>> *2019-03-12 14:12:13,143
>> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>> Removing /checkpoint-counter/ from
>> ZooKeeper *
>>
>> but exist with a non zero code
>>
>> *2019-03-12 14:12:13,477
>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint 
>> with
>> exit code 1444.*
>>
>>
>> That I think is an issue. A cancelled job is a complete job and thus
>> the exit code should be 0 for k8s to mark it complete.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar <
>> bhaskar.eba...@gmail.com> wrote:
>>
>>> Yes Vishal. Thats correct.
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 This really not cool but here you go. This seems to work. Agreed
 that this cannot be this painful. The cancel does not exit with an exit
 code pf 0 and thus the job has to manually delete. Vijay does this 
 align
 with what you have had to do ?


- Take a save point . This returns a request id

curl  --header "Content-Type: application/json" --request POST 
 --data 
 '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
 
 https://*/jobs//savepoints



- Make sure the save point succeeded

curl  --request GET   
 https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687



- cancel the job

curl  --request PATCH 
 https://***/jobs/?mode=cancel



- Delete the job and deployment

kubectl delete -f 
 manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml

kubectl delete -f 
 

Re: local disk cleanup after crash

2019-03-12 Thread Gary Yao
Hi,

If no other TaskManager (TM) is running, you can delete everything. If
multiple TMs share the same host, as far as I know, you will have to parse
TM
logs to know what directories you can delete [1]. As for local recovery,
tasks
that were running on a crashed TM are lost. From the documentation [2]:

If a task manager is lost, the local state from all its task is lost.

Therefore, assuming that only one TM is running on each host, you can delete
everything.

Best,
Gary

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-are-blobstore-files-and-why-do-they-keep-filling-up-tmp-directory-td26323.html
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

On Thu, Mar 7, 2019 at 10:45 PM Derek VerLee  wrote:

> I think that effort is put in to have task managers clean up their
> folders, however I have noticed that in some cases local folders are not
> cleaned up and can build up, eventually causing problems due to a full
> disk.  As far as I know this only happens with crashes and other
> out-of-happy-path scenarios.
>
> I am thinking of writing a script to clean up local folders that runs
> before task-manager starts between restarts in the case of a crash.
>
> Assuming local recovery is not configured, what should I delete and what
> should I leave around?
>
> What should I keep if local recovery is configured?
>
>
> Under the "taskmanager.tmp.dirs" I see:
>
> blobStore-*
> flink-dist-cache-*
> flink-io-*
> localState/*
> rocksdb-lib-*
>
>
> Thanks
>


Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vishal Santoshi
Do you have a mvn repository ( at mvn central )  set up for 1,8 release
candidate. We could test it for you.

Without 1.8and this exit code we are essentially held up.

On Tue, Mar 12, 2019 at 10:56 AM Gary Yao  wrote:

> Nobody can tell with 100% certainty. We want to give the RC some exposure
> first, and there is also a release process that is prescribed by the ASF
> [1].
> You can look at past releases to get a feeling for how long the release
> process lasts [2].
>
> [1] http://www.apache.org/legal/release-policy.html#release-approval
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=search_page=1=%5BVOTE%5D+Release=0
>
>
> On Tue, Mar 12, 2019 at 3:38 PM Vishal Santoshi 
> wrote:
>
>> And when is the 1.8.0 release expected ?
>>
>> On Tue, Mar 12, 2019 at 10:32 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> :) That makes so much more sense. Is  k8s native flink a part of this
>>> release ?
>>>
>>> On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:
>>>
 Hi Vishal,

 This issue was fixed recently [1], and the patch will be released with
 1.8. If
 the Flink job gets cancelled, the JVM should exit with code 0. There is
 a
 release candidate [2], which you can test.

 Best,
 Gary

 [1] https://issues.apache.org/jira/browse/FLINK-10743
 [2]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html

 On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Thanks Vijay,
>
> This is the larger issue.  The cancellation routine is itself broken.
>
> On cancellation flink does remove the checkpoint counter
>
> *2019-03-12 14:12:13,143
> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
> Removing /checkpoint-counter/ from
> ZooKeeper *
>
> but exist with a non zero code
>
> *2019-03-12 14:12:13,477
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
> exit code 1444.*
>
>
> That I think is an issue. A cancelled job is a complete job and thus
> the exit code should be 0 for k8s to mark it complete.
>
>
>
>
>
>
>
>
>
> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar <
> bhaskar.eba...@gmail.com> wrote:
>
>> Yes Vishal. Thats correct.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This really not cool but here you go. This seems to work. Agreed
>>> that this cannot be this painful. The cancel does not exit with an exit
>>> code pf 0 and thus the job has to manually delete. Vijay does this align
>>> with what you have had to do ?
>>>
>>>
>>>- Take a save point . This returns a request id
>>>
>>>curl  --header "Content-Type: application/json" --request POST 
>>> --data 
>>> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
>>> 
>>> https://*/jobs//savepoints
>>>
>>>
>>>
>>>- Make sure the save point succeeded
>>>
>>>curl  --request GET   
>>> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>>>
>>>
>>>
>>>- cancel the job
>>>
>>>curl  --request PATCH 
>>> https://***/jobs/?mode=cancel
>>>
>>>
>>>
>>>- Delete the job and deployment
>>>
>>>kubectl delete -f 
>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>
>>>kubectl delete -f 
>>> manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>>
>>>
>>>
>>>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>>>
>>>args: ["job-cluster",
>>>
>>>   "--fromSavepoint",
>>>
>>>   
>>> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>>>   "--job-classname", .
>>>
>>>
>>>
>>>- Restart
>>>
>>>kubectl create -f 
>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>
>>>kubectl create -f 
>>> manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>>
>>>
>>>
>>>- Make sure from the UI, that it restored from the specific save
>>>point.
>>>
>>>
>>> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar <
>>> bhaskar.eba...@gmail.com> wrote:
>>>
 Yes Its supposed to work.  But unfortunately it was not working.
 Flink community needs to respond to this behavior.

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Gary Yao
Nobody can tell with 100% certainty. We want to give the RC some exposure
first, and there is also a release process that is prescribed by the ASF
[1].
You can look at past releases to get a feeling for how long the release
process lasts [2].

[1] http://www.apache.org/legal/release-policy.html#release-approval
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/template/NamlServlet.jtp?macro=search_page=1=%5BVOTE%5D+Release=0


On Tue, Mar 12, 2019 at 3:38 PM Vishal Santoshi 
wrote:

> And when is the 1.8.0 release expected ?
>
> On Tue, Mar 12, 2019 at 10:32 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> :) That makes so much more sense. Is  k8s native flink a part of this
>> release ?
>>
>> On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:
>>
>>> Hi Vishal,
>>>
>>> This issue was fixed recently [1], and the patch will be released with
>>> 1.8. If
>>> the Flink job gets cancelled, the JVM should exit with code 0. There is a
>>> release candidate [2], which you can test.
>>>
>>> Best,
>>> Gary
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-10743
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>>>
>>> On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Thanks Vijay,

 This is the larger issue.  The cancellation routine is itself broken.

 On cancellation flink does remove the checkpoint counter

 *2019-03-12 14:12:13,143
 INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
 Removing /checkpoint-counter/ from
 ZooKeeper *

 but exist with a non zero code

 *2019-03-12 14:12:13,477
 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
 Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
 exit code 1444.*


 That I think is an issue. A cancelled job is a complete job and thus
 the exit code should be 0 for k8s to mark it complete.









 On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar <
 bhaskar.eba...@gmail.com> wrote:

> Yes Vishal. Thats correct.
>
> Regards
> Bhaskar
>
> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> This really not cool but here you go. This seems to work. Agreed that
>> this cannot be this painful. The cancel does not exit with an exit code 
>> pf
>> 0 and thus the job has to manually delete. Vijay does this align with 
>> what
>> you have had to do ?
>>
>>
>>- Take a save point . This returns a request id
>>
>>curl  --header "Content-Type: application/json" --request POST --data 
>> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
>> 
>> https://*/jobs//savepoints
>>
>>
>>
>>- Make sure the save point succeeded
>>
>>curl  --request GET   
>> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>>
>>
>>
>>- cancel the job
>>
>>curl  --request PATCH 
>> https://***/jobs/?mode=cancel
>>
>>
>>
>>- Delete the job and deployment
>>
>>kubectl delete -f 
>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>
>>kubectl delete -f 
>> manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>
>>
>>
>>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>>
>>args: ["job-cluster",
>>
>>   "--fromSavepoint",
>>
>>   
>> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>>   "--job-classname", .
>>
>>
>>
>>- Restart
>>
>>kubectl create -f 
>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>
>>kubectl create -f 
>> manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>
>>
>>
>>- Make sure from the UI, that it restored from the specific save
>>point.
>>
>>
>> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar <
>> bhaskar.eba...@gmail.com> wrote:
>>
>>> Yes Its supposed to work.  But unfortunately it was not working.
>>> Flink community needs to respond to this behavior.
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Aah.
 Let me try this out and will get back to you.
 Though I would assume that save point with cancel is a single
 atomic step, rather then a save point *followed*  by a
 cancellation ( 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vishal Santoshi
This really not cool but here you go. This seems to work. Agreed that this
cannot be this painful. The cancel does not exit with an exit code pf 0 and
thus the job has to manually delete. Vijay does this align with what you
have had to do ?


   - Take a save point . This returns a request id

   curl  --header "Content-Type: application/json" --request POST
--data 
'{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
   https://*/jobs//savepoints



   - Make sure the save point succeeded

   curl  --request GET
https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687



   - cancel the job

   curl  --request PATCH
https://***/jobs/?mode=cancel



   - Delete the job and deployment

   kubectl delete -f manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml

   kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml



   - Edit the job-cluster-job-deployment.yaml. Add/Edit

   args: ["job-cluster",

  "--fromSavepoint",

  "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
  "--job-classname", .



   - Restart

   kubectl create -f manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml

   kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml



   - Make sure from the UI, that it restored from the specific save point.


On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar 
wrote:

> Yes Its supposed to work.  But unfortunately it was not working. Flink
> community needs to respond to this behavior.
>
> Regards
> Bhaskar
>
> On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi 
> wrote:
>
>> Aah.
>> Let me try this out and will get back to you.
>> Though I would assume that save point with cancel is a single atomic
>> step, rather then a save point *followed*  by a cancellation ( else why
>> would that be an option ).
>> Thanks again.
>>
>>
>> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> yarn-cancel doesn't mean to be for yarn cluster. It works for all
>>> clusters. Its recommended command.
>>>
>>> Use the following command to issue save point.
>>>  curl  --header "Content-Type: application/json" --request POST --data
>>> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":false}'
>>> \ https://
>>> .ingress.***/jobs//savepoints
>>>
>>> Then issue yarn-cancel.
>>> After that  follow the process to restore save point
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Hello Vijay,

Thank you for the reply. This though is k8s deployment (
 rather then yarn ) but may be they follow the same lifecycle.  I issue a*
 save point with cancel*  as documented here
 https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
 a straight up
  curl  --header "Content-Type: application/json" --request POST --data
 '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":true}'
 \ https://
 .ingress.***/jobs//savepoints

 I would assume that after taking the save point, the jvm should exit,
 after all the k8s deployment is of kind: job and if it is a job cluster
 then a cancellation should exit the jvm and hence the pod. It does seem to
 do some things right. It stops a bunch of stuff ( the JobMaster, the
 slotPol, zookeeper coordinator etc ) . It also remove the checkpoint
 counter but does not exit  the job. And after a little bit the job is
 restarted which does not make sense and absolutely not the right thing to
 do  ( to me at least ).

 Further if I delete the deployment and the job from k8s and restart the
 job and deployment fromSavePoint, it refuses to honor the fromSavePoint. I
 have to delete the zk chroot for it to consider the save point.


 Thus the process of cancelling and resuming from a SP on a k8s job
 cluster deployment  seems to be

- cancel with save point as defined hre

 https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints
- delete the job manger job  and task manager deployments from k8s
almost immediately.
- clear the ZK chroot for the 000.. job  and may be the
checkpoints directory.
- resumeFromCheckPoint

 If some body can say that this indeed is the process ?



  Logs are attached.



 2019-03-12 08:10:43,871 INFO
 org.apache.flink.runtime.jobmaster.JobMaster  -
 Savepoint stored in
 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Gary Yao
Hi Vishal,

I'm afraid not but there are open pull requests for that. You can track the
progress here:

https://issues.apache.org/jira/browse/FLINK-9953

Best,
Gary

On Tue, Mar 12, 2019 at 3:32 PM Vishal Santoshi 
wrote:

> :) That makes so much more sense. Is  k8s native flink a part of this
> release ?
>
> On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:
>
>> Hi Vishal,
>>
>> This issue was fixed recently [1], and the patch will be released with
>> 1.8. If
>> the Flink job gets cancelled, the JVM should exit with code 0. There is a
>> release candidate [2], which you can test.
>>
>> Best,
>> Gary
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10743
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>>
>> On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thanks Vijay,
>>>
>>> This is the larger issue.  The cancellation routine is itself broken.
>>>
>>> On cancellation flink does remove the checkpoint counter
>>>
>>> *2019-03-12 14:12:13,143
>>> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>>> Removing /checkpoint-counter/ from
>>> ZooKeeper *
>>>
>>> but exist with a non zero code
>>>
>>> *2019-03-12 14:12:13,477
>>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
>>> exit code 1444.*
>>>
>>>
>>> That I think is an issue. A cancelled job is a complete job and thus the
>>> exit code should be 0 for k8s to mark it complete.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar 
>>> wrote:
>>>
 Yes Vishal. Thats correct.

 Regards
 Bhaskar

 On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> This really not cool but here you go. This seems to work. Agreed that
> this cannot be this painful. The cancel does not exit with an exit code pf
> 0 and thus the job has to manually delete. Vijay does this align with what
> you have had to do ?
>
>
>- Take a save point . This returns a request id
>
>curl  --header "Content-Type: application/json" --request POST --data 
> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
> https://*/jobs//savepoints
>
>
>
>- Make sure the save point succeeded
>
>curl  --request GET   
> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>
>
>
>- cancel the job
>
>curl  --request PATCH 
> https://***/jobs/?mode=cancel
>
>
>
>- Delete the job and deployment
>
>kubectl delete -f 
> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>
>kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>
>
>
>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>
>args: ["job-cluster",
>
>   "--fromSavepoint",
>
>   
> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>   "--job-classname", .
>
>
>
>- Restart
>
>kubectl create -f 
> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>
>kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>
>
>
>- Make sure from the UI, that it restored from the specific save
>point.
>
>
> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar <
> bhaskar.eba...@gmail.com> wrote:
>
>> Yes Its supposed to work.  But unfortunately it was not working.
>> Flink community needs to respond to this behavior.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Aah.
>>> Let me try this out and will get back to you.
>>> Though I would assume that save point with cancel is a single atomic
>>> step, rather then a save point *followed*  by a cancellation ( else
>>> why would that be an option ).
>>> Thanks again.
>>>
>>>
>>> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar <
>>> bhaskar.eba...@gmail.com> wrote:
>>>
 Hi Vishal,

 yarn-cancel doesn't mean to be for yarn cluster. It works for all
 clusters. Its recommended command.

 Use the following command to issue save point.
  curl  --header "Content-Type: application/json" --request POST
 --data '{"target-directory":"hdfs://*:8020/tmp/xyz1",
 "cancel-job":false}'  \ https://
 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vishal Santoshi
And when is the 1.8.0 release expected ?

On Tue, Mar 12, 2019 at 10:32 AM Vishal Santoshi 
wrote:

> :) That makes so much more sense. Is  k8s native flink a part of this
> release ?
>
> On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:
>
>> Hi Vishal,
>>
>> This issue was fixed recently [1], and the patch will be released with
>> 1.8. If
>> the Flink job gets cancelled, the JVM should exit with code 0. There is a
>> release candidate [2], which you can test.
>>
>> Best,
>> Gary
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10743
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>>
>> On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thanks Vijay,
>>>
>>> This is the larger issue.  The cancellation routine is itself broken.
>>>
>>> On cancellation flink does remove the checkpoint counter
>>>
>>> *2019-03-12 14:12:13,143
>>> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>>> Removing /checkpoint-counter/ from
>>> ZooKeeper *
>>>
>>> but exist with a non zero code
>>>
>>> *2019-03-12 14:12:13,477
>>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
>>> exit code 1444.*
>>>
>>>
>>> That I think is an issue. A cancelled job is a complete job and thus the
>>> exit code should be 0 for k8s to mark it complete.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar 
>>> wrote:
>>>
 Yes Vishal. Thats correct.

 Regards
 Bhaskar

 On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> This really not cool but here you go. This seems to work. Agreed that
> this cannot be this painful. The cancel does not exit with an exit code pf
> 0 and thus the job has to manually delete. Vijay does this align with what
> you have had to do ?
>
>
>- Take a save point . This returns a request id
>
>curl  --header "Content-Type: application/json" --request POST --data 
> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
> https://*/jobs//savepoints
>
>
>
>- Make sure the save point succeeded
>
>curl  --request GET   
> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>
>
>
>- cancel the job
>
>curl  --request PATCH 
> https://***/jobs/?mode=cancel
>
>
>
>- Delete the job and deployment
>
>kubectl delete -f 
> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>
>kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>
>
>
>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>
>args: ["job-cluster",
>
>   "--fromSavepoint",
>
>   
> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>   "--job-classname", .
>
>
>
>- Restart
>
>kubectl create -f 
> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>
>kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>
>
>
>- Make sure from the UI, that it restored from the specific save
>point.
>
>
> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar <
> bhaskar.eba...@gmail.com> wrote:
>
>> Yes Its supposed to work.  But unfortunately it was not working.
>> Flink community needs to respond to this behavior.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Aah.
>>> Let me try this out and will get back to you.
>>> Though I would assume that save point with cancel is a single atomic
>>> step, rather then a save point *followed*  by a cancellation ( else
>>> why would that be an option ).
>>> Thanks again.
>>>
>>>
>>> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar <
>>> bhaskar.eba...@gmail.com> wrote:
>>>
 Hi Vishal,

 yarn-cancel doesn't mean to be for yarn cluster. It works for all
 clusters. Its recommended command.

 Use the following command to issue save point.
  curl  --header "Content-Type: application/json" --request POST
 --data '{"target-directory":"hdfs://*:8020/tmp/xyz1",
 "cancel-job":false}'  \ https://
 .ingress.***/jobs//savepoints

 Then issue yarn-cancel.
 After that  follow the process to 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vishal Santoshi
:) That makes so much more sense. Is  k8s native flink a part of this
release ?

On Tue, Mar 12, 2019 at 10:27 AM Gary Yao  wrote:

> Hi Vishal,
>
> This issue was fixed recently [1], and the patch will be released with
> 1.8. If
> the Flink job gets cancelled, the JVM should exit with code 0. There is a
> release candidate [2], which you can test.
>
> Best,
> Gary
>
> [1] https://issues.apache.org/jira/browse/FLINK-10743
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html
>
> On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi 
> wrote:
>
>> Thanks Vijay,
>>
>> This is the larger issue.  The cancellation routine is itself broken.
>>
>> On cancellation flink does remove the checkpoint counter
>>
>> *2019-03-12 14:12:13,143
>> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
>> Removing /checkpoint-counter/ from
>> ZooKeeper *
>>
>> but exist with a non zero code
>>
>> *2019-03-12 14:12:13,477
>> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
>> exit code 1444.*
>>
>>
>> That I think is an issue. A cancelled job is a complete job and thus the
>> exit code should be 0 for k8s to mark it complete.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar 
>> wrote:
>>
>>> Yes Vishal. Thats correct.
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 This really not cool but here you go. This seems to work. Agreed that
 this cannot be this painful. The cancel does not exit with an exit code pf
 0 and thus the job has to manually delete. Vijay does this align with what
 you have had to do ?


- Take a save point . This returns a request id

curl  --header "Content-Type: application/json" --request POST --data 
 '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
 https://*/jobs//savepoints



- Make sure the save point succeeded

curl  --request GET   
 https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687



- cancel the job

curl  --request PATCH 
 https://***/jobs/?mode=cancel



- Delete the job and deployment

kubectl delete -f 
 manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml

kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml



- Edit the job-cluster-job-deployment.yaml. Add/Edit

args: ["job-cluster",

   "--fromSavepoint",

   
 "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
   "--job-classname", .



- Restart

kubectl create -f 
 manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml

kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml



- Make sure from the UI, that it restored from the specific save
point.


 On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar 
 wrote:

> Yes Its supposed to work.  But unfortunately it was not working. Flink
> community needs to respond to this behavior.
>
> Regards
> Bhaskar
>
> On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Aah.
>> Let me try this out and will get back to you.
>> Though I would assume that save point with cancel is a single atomic
>> step, rather then a save point *followed*  by a cancellation ( else
>> why would that be an option ).
>> Thanks again.
>>
>>
>> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar <
>> bhaskar.eba...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> yarn-cancel doesn't mean to be for yarn cluster. It works for all
>>> clusters. Its recommended command.
>>>
>>> Use the following command to issue save point.
>>>  curl  --header "Content-Type: application/json" --request POST
>>> --data '{"target-directory":"hdfs://*:8020/tmp/xyz1",
>>> "cancel-job":false}'  \ https://
>>> .ingress.***/jobs//savepoints
>>>
>>> Then issue yarn-cancel.
>>> After that  follow the process to restore save point
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Hello Vijay,

Thank you for the reply. This though is k8s
 deployment ( rather then yarn ) but 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Gary Yao
Hi Vishal,

This issue was fixed recently [1], and the patch will be released with 1.8.
If
the Flink job gets cancelled, the JVM should exit with code 0. There is a
release candidate [2], which you can test.

Best,
Gary

[1] https://issues.apache.org/jira/browse/FLINK-10743
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-0-release-candidate-1-td27637.html

On Tue, Mar 12, 2019 at 3:21 PM Vishal Santoshi 
wrote:

> Thanks Vijay,
>
> This is the larger issue.  The cancellation routine is itself broken.
>
> On cancellation flink does remove the checkpoint counter
>
> *2019-03-12 14:12:13,143
> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
> Removing /checkpoint-counter/ from
> ZooKeeper *
>
> but exist with a non zero code
>
> *2019-03-12 14:12:13,477
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
> exit code 1444.*
>
>
> That I think is an issue. A cancelled job is a complete job and thus the
> exit code should be 0 for k8s to mark it complete.
>
>
>
>
>
>
>
>
>
> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar 
> wrote:
>
>> Yes Vishal. Thats correct.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This really not cool but here you go. This seems to work. Agreed that
>>> this cannot be this painful. The cancel does not exit with an exit code pf
>>> 0 and thus the job has to manually delete. Vijay does this align with what
>>> you have had to do ?
>>>
>>>
>>>- Take a save point . This returns a request id
>>>
>>>curl  --header "Content-Type: application/json" --request POST --data 
>>> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
>>> https://*/jobs//savepoints
>>>
>>>
>>>
>>>- Make sure the save point succeeded
>>>
>>>curl  --request GET   
>>> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>>>
>>>
>>>
>>>- cancel the job
>>>
>>>curl  --request PATCH 
>>> https://***/jobs/?mode=cancel
>>>
>>>
>>>
>>>- Delete the job and deployment
>>>
>>>kubectl delete -f 
>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>
>>>kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>>
>>>
>>>
>>>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>>>
>>>args: ["job-cluster",
>>>
>>>   "--fromSavepoint",
>>>
>>>   
>>> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>>>   "--job-classname", .
>>>
>>>
>>>
>>>- Restart
>>>
>>>kubectl create -f 
>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>
>>>kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>>
>>>
>>>
>>>- Make sure from the UI, that it restored from the specific save
>>>point.
>>>
>>>
>>> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar 
>>> wrote:
>>>
 Yes Its supposed to work.  But unfortunately it was not working. Flink
 community needs to respond to this behavior.

 Regards
 Bhaskar

 On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Aah.
> Let me try this out and will get back to you.
> Though I would assume that save point with cancel is a single atomic
> step, rather then a save point *followed*  by a cancellation ( else
> why would that be an option ).
> Thanks again.
>
>
> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar <
> bhaskar.eba...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> yarn-cancel doesn't mean to be for yarn cluster. It works for all
>> clusters. Its recommended command.
>>
>> Use the following command to issue save point.
>>  curl  --header "Content-Type: application/json" --request POST
>> --data '{"target-directory":"hdfs://*:8020/tmp/xyz1",
>> "cancel-job":false}'  \ https://
>> .ingress.***/jobs//savepoints
>>
>> Then issue yarn-cancel.
>> After that  follow the process to restore save point
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hello Vijay,
>>>
>>>Thank you for the reply. This though is k8s
>>> deployment ( rather then yarn ) but may be they follow the same 
>>> lifecycle.
>>> I issue a* save point with cancel*  as documented here
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
>>> a straight up
>>>  curl  --header "Content-Type: application/json" 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vijay Bhaskar
Oh, Yeah this is larger issue indeed :)

Regards
Bhaskar

On Tue, Mar 12, 2019 at 7:51 PM Vishal Santoshi 
wrote:

> Thanks Vijay,
>
> This is the larger issue.  The cancellation routine is itself broken.
>
> On cancellation flink does remove the checkpoint counter
>
> *2019-03-12 14:12:13,143
> INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
> Removing /checkpoint-counter/ from
> ZooKeeper *
>
> but exist with a non zero code
>
> *2019-03-12 14:12:13,477
> INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
> exit code 1444.*
>
>
> That I think is an issue. A cancelled job is a complete job and thus the
> exit code should be 0 for k8s to mark it complete.
>
>
>
>
>
>
>
>
>
> On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar 
> wrote:
>
>> Yes Vishal. Thats correct.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This really not cool but here you go. This seems to work. Agreed that
>>> this cannot be this painful. The cancel does not exit with an exit code pf
>>> 0 and thus the job has to manually delete. Vijay does this align with what
>>> you have had to do ?
>>>
>>>
>>>- Take a save point . This returns a request id
>>>
>>>curl  --header "Content-Type: application/json" --request POST --data 
>>> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'
>>> https://*/jobs//savepoints
>>>
>>>
>>>
>>>- Make sure the save point succeeded
>>>
>>>curl  --request GET   
>>> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>>>
>>>
>>>
>>>- cancel the job
>>>
>>>curl  --request PATCH 
>>> https://***/jobs/?mode=cancel
>>>
>>>
>>>
>>>- Delete the job and deployment
>>>
>>>kubectl delete -f 
>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>
>>>kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>>
>>>
>>>
>>>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>>>
>>>args: ["job-cluster",
>>>
>>>   "--fromSavepoint",
>>>
>>>   
>>> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>>>   "--job-classname", .
>>>
>>>
>>>
>>>- Restart
>>>
>>>kubectl create -f 
>>> manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>>
>>>kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>>
>>>
>>>
>>>- Make sure from the UI, that it restored from the specific save
>>>point.
>>>
>>>
>>> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar 
>>> wrote:
>>>
 Yes Its supposed to work.  But unfortunately it was not working. Flink
 community needs to respond to this behavior.

 Regards
 Bhaskar

 On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Aah.
> Let me try this out and will get back to you.
> Though I would assume that save point with cancel is a single atomic
> step, rather then a save point *followed*  by a cancellation ( else
> why would that be an option ).
> Thanks again.
>
>
> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar <
> bhaskar.eba...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> yarn-cancel doesn't mean to be for yarn cluster. It works for all
>> clusters. Its recommended command.
>>
>> Use the following command to issue save point.
>>  curl  --header "Content-Type: application/json" --request POST
>> --data '{"target-directory":"hdfs://*:8020/tmp/xyz1",
>> "cancel-job":false}'  \ https://
>> .ingress.***/jobs//savepoints
>>
>> Then issue yarn-cancel.
>> After that  follow the process to restore save point
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hello Vijay,
>>>
>>>Thank you for the reply. This though is k8s
>>> deployment ( rather then yarn ) but may be they follow the same 
>>> lifecycle.
>>> I issue a* save point with cancel*  as documented here
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
>>> a straight up
>>>  curl  --header "Content-Type: application/json" --request POST
>>> --data
>>> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":true}'
>>> \ https://
>>> .ingress.***/jobs//savepoints
>>>
>>> I would assume that after taking the save point, the jvm should
>>> exit, after all the k8s deployment is 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vishal Santoshi
Thanks Vijay,

This is the larger issue.  The cancellation routine is itself broken.

On cancellation flink does remove the checkpoint counter

*2019-03-12 14:12:13,143
INFO  org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
Removing /checkpoint-counter/ from
ZooKeeper *

but exist with a non zero code

*2019-03-12 14:12:13,477
INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
Terminating cluster entrypoint process StandaloneJobClusterEntryPoint with
exit code 1444.*


That I think is an issue. A cancelled job is a complete job and thus the
exit code should be 0 for k8s to mark it complete.









On Tue, Mar 12, 2019 at 10:18 AM Vijay Bhaskar 
wrote:

> Yes Vishal. Thats correct.
>
> Regards
> Bhaskar
>
> On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi 
> wrote:
>
>> This really not cool but here you go. This seems to work. Agreed that
>> this cannot be this painful. The cancel does not exit with an exit code pf
>> 0 and thus the job has to manually delete. Vijay does this align with what
>> you have had to do ?
>>
>>
>>- Take a save point . This returns a request id
>>
>>curl  --header "Content-Type: application/json" --request POST --data 
>> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}' 
>>https://*/jobs//savepoints
>>
>>
>>
>>- Make sure the save point succeeded
>>
>>curl  --request GET   
>> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>>
>>
>>
>>- cancel the job
>>
>>curl  --request PATCH 
>> https://***/jobs/?mode=cancel
>>
>>
>>
>>- Delete the job and deployment
>>
>>kubectl delete -f manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>
>>kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>
>>
>>
>>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>>
>>args: ["job-cluster",
>>
>>   "--fromSavepoint",
>>
>>   
>> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>>   "--job-classname", .
>>
>>
>>
>>- Restart
>>
>>kubectl create -f manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>>
>>kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>>
>>
>>
>>- Make sure from the UI, that it restored from the specific save
>>point.
>>
>>
>> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar 
>> wrote:
>>
>>> Yes Its supposed to work.  But unfortunately it was not working. Flink
>>> community needs to respond to this behavior.
>>>
>>> Regards
>>> Bhaskar
>>>
>>> On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Aah.
 Let me try this out and will get back to you.
 Though I would assume that save point with cancel is a single atomic
 step, rather then a save point *followed*  by a cancellation ( else
 why would that be an option ).
 Thanks again.


 On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar 
 wrote:

> Hi Vishal,
>
> yarn-cancel doesn't mean to be for yarn cluster. It works for all
> clusters. Its recommended command.
>
> Use the following command to issue save point.
>  curl  --header "Content-Type: application/json" --request POST --data
> '{"target-directory":"hdfs://*:8020/tmp/xyz1",
> "cancel-job":false}'  \ https://
> .ingress.***/jobs//savepoints
>
> Then issue yarn-cancel.
> After that  follow the process to restore save point
>
> Regards
> Bhaskar
>
> On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Hello Vijay,
>>
>>Thank you for the reply. This though is k8s deployment
>> ( rather then yarn ) but may be they follow the same lifecycle.  I issue 
>> a*
>> save point with cancel*  as documented here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
>> a straight up
>>  curl  --header "Content-Type: application/json" --request POST
>> --data
>> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":true}'
>> \ https://
>> .ingress.***/jobs//savepoints
>>
>> I would assume that after taking the save point, the jvm should exit,
>> after all the k8s deployment is of kind: job and if it is a job cluster
>> then a cancellation should exit the jvm and hence the pod. It does seem 
>> to
>> do some things right. It stops a bunch of stuff ( the JobMaster, the
>> slotPol, zookeeper coordinator etc ) . It also remove the checkpoint
>> counter but does not exit  the job. And after a little bit the job 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vijay Bhaskar
Yes Vishal. Thats correct.

Regards
Bhaskar

On Tue, Mar 12, 2019 at 7:14 PM Vishal Santoshi 
wrote:

> This really not cool but here you go. This seems to work. Agreed that this
> cannot be this painful. The cancel does not exit with an exit code pf 0 and
> thus the job has to manually delete. Vijay does this align with what you
> have had to do ?
>
>
>- Take a save point . This returns a request id
>
>curl  --header "Content-Type: application/json" --request POST --data 
> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'  
>   https://*/jobs//savepoints
>
>
>
>- Make sure the save point succeeded
>
>curl  --request GET   
> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>
>
>
>- cancel the job
>
>curl  --request PATCH 
> https://***/jobs/?mode=cancel
>
>
>
>- Delete the job and deployment
>
>kubectl delete -f manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>
>kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>
>
>
>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>
>args: ["job-cluster",
>
>   "--fromSavepoint",
>
>   
> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>   "--job-classname", .
>
>
>
>- Restart
>
>kubectl create -f manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>
>kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>
>
>
>- Make sure from the UI, that it restored from the specific save point.
>
>
> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar 
> wrote:
>
>> Yes Its supposed to work.  But unfortunately it was not working. Flink
>> community needs to respond to this behavior.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Aah.
>>> Let me try this out and will get back to you.
>>> Though I would assume that save point with cancel is a single atomic
>>> step, rather then a save point *followed*  by a cancellation ( else why
>>> would that be an option ).
>>> Thanks again.
>>>
>>>
>>> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar 
>>> wrote:
>>>
 Hi Vishal,

 yarn-cancel doesn't mean to be for yarn cluster. It works for all
 clusters. Its recommended command.

 Use the following command to issue save point.
  curl  --header "Content-Type: application/json" --request POST --data
 '{"target-directory":"hdfs://*:8020/tmp/xyz1",
 "cancel-job":false}'  \ https://
 .ingress.***/jobs//savepoints

 Then issue yarn-cancel.
 After that  follow the process to restore save point

 Regards
 Bhaskar

 On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Hello Vijay,
>
>Thank you for the reply. This though is k8s deployment
> ( rather then yarn ) but may be they follow the same lifecycle.  I issue 
> a*
> save point with cancel*  as documented here
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
> a straight up
>  curl  --header "Content-Type: application/json" --request POST --data
> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":true}'
> \ https://
> .ingress.***/jobs//savepoints
>
> I would assume that after taking the save point, the jvm should exit,
> after all the k8s deployment is of kind: job and if it is a job cluster
> then a cancellation should exit the jvm and hence the pod. It does seem to
> do some things right. It stops a bunch of stuff ( the JobMaster, the
> slotPol, zookeeper coordinator etc ) . It also remove the checkpoint
> counter but does not exit  the job. And after a little bit the job is
> restarted which does not make sense and absolutely not the right thing to
> do  ( to me at least ).
>
> Further if I delete the deployment and the job from k8s and restart
> the job and deployment fromSavePoint, it refuses to honor the
> fromSavePoint. I have to delete the zk chroot for it to consider the save
> point.
>
>
> Thus the process of cancelling and resuming from a SP on a k8s job
> cluster deployment  seems to be
>
>- cancel with save point as defined hre
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints
>- delete the job manger job  and task manager deployments from k8s
>almost immediately.
>- clear the ZK chroot for the 000.. job  and may be the
>checkpoints directory.
>- 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vishal Santoshi
I must add that there has to be more love for k8s flink deployments. IMHO
that is the way to go.  Maintaining a captive/session cluster, if you have
k8s on premise is pretty much a no go  for various reasons.

On Tue, Mar 12, 2019 at 9:44 AM Vishal Santoshi 
wrote:

> This really not cool but here you go. This seems to work. Agreed that this
> cannot be this painful. The cancel does not exit with an exit code pf 0 and
> thus the job has to manually delete. Vijay does this align with what you
> have had to do ?
>
>
>- Take a save point . This returns a request id
>
>curl  --header "Content-Type: application/json" --request POST --data 
> '{"target-directory":"hdfs://nn-crunchy:8020/tmp/xyz14","cancel-job":false}'  
>   https://*/jobs//savepoints
>
>
>
>- Make sure the save point succeeded
>
>curl  --request GET   
> https:///jobs//savepoints/2c053ce3bea31276aa25e63784629687
>
>
>
>- cancel the job
>
>curl  --request PATCH 
> https://***/jobs/?mode=cancel
>
>
>
>- Delete the job and deployment
>
>kubectl delete -f manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>
>kubectl delete -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>
>
>
>- Edit the job-cluster-job-deployment.yaml. Add/Edit
>
>args: ["job-cluster",
>
>   "--fromSavepoint",
>
>   
> "hdfs:///tmp/xyz14/savepoint-00-1d4f71345e22",
>   "--job-classname", .
>
>
>
>- Restart
>
>kubectl create -f manifests/bf2-PRODUCTION/job-cluster-job-deployment.yaml
>
>kubectl create -f manifests/bf2-PRODUCTION/task-manager-deployment.yaml
>
>
>
>- Make sure from the UI, that it restored from the specific save point.
>
>
> On Tue, Mar 12, 2019 at 7:26 AM Vijay Bhaskar 
> wrote:
>
>> Yes Its supposed to work.  But unfortunately it was not working. Flink
>> community needs to respond to this behavior.
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Aah.
>>> Let me try this out and will get back to you.
>>> Though I would assume that save point with cancel is a single atomic
>>> step, rather then a save point *followed*  by a cancellation ( else why
>>> would that be an option ).
>>> Thanks again.
>>>
>>>
>>> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar 
>>> wrote:
>>>
 Hi Vishal,

 yarn-cancel doesn't mean to be for yarn cluster. It works for all
 clusters. Its recommended command.

 Use the following command to issue save point.
  curl  --header "Content-Type: application/json" --request POST --data
 '{"target-directory":"hdfs://*:8020/tmp/xyz1",
 "cancel-job":false}'  \ https://
 .ingress.***/jobs//savepoints

 Then issue yarn-cancel.
 After that  follow the process to restore save point

 Regards
 Bhaskar

 On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Hello Vijay,
>
>Thank you for the reply. This though is k8s deployment
> ( rather then yarn ) but may be they follow the same lifecycle.  I issue 
> a*
> save point with cancel*  as documented here
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
> a straight up
>  curl  --header "Content-Type: application/json" --request POST --data
> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":true}'
> \ https://
> .ingress.***/jobs//savepoints
>
> I would assume that after taking the save point, the jvm should exit,
> after all the k8s deployment is of kind: job and if it is a job cluster
> then a cancellation should exit the jvm and hence the pod. It does seem to
> do some things right. It stops a bunch of stuff ( the JobMaster, the
> slotPol, zookeeper coordinator etc ) . It also remove the checkpoint
> counter but does not exit  the job. And after a little bit the job is
> restarted which does not make sense and absolutely not the right thing to
> do  ( to me at least ).
>
> Further if I delete the deployment and the job from k8s and restart
> the job and deployment fromSavePoint, it refuses to honor the
> fromSavePoint. I have to delete the zk chroot for it to consider the save
> point.
>
>
> Thus the process of cancelling and resuming from a SP on a k8s job
> cluster deployment  seems to be
>
>- cancel with save point as defined hre
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints
>- delete the job manger job  and task manager deployments 

Re: Expressing Flink array aggregation using Table / SQL API

2019-03-12 Thread Piyush Narang
Thanks for getting back Kurt. Yeah this might be an option to try out. I was 
hoping there would be a way to express this directly in the SQL though ☹.

-- Piyush


From: Kurt Young 
Date: Tuesday, March 12, 2019 at 2:25 AM
To: Piyush Narang 
Cc: "user@flink.apache.org" 
Subject: Re: Expressing Flink array aggregation using Table / SQL API

Hi Piyush,

Could you try to add clientId into your aggregate function, and to track the 
map of  inside your new aggregate 
function, and assemble what ever result when emit.
The SQL will looks like:

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField)
FROM my_kafka_stream_table
GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

Kurt


On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang 
mailto:p.nar...@criteo.com>> wrote:
Hi folks,

I’m getting started with Flink and trying to figure out how to express 
aggregating some rows into an array to finally sink data into an 
AppendStreamTableSink.
My data looks something like this:
userId, clientId, eventType, timestamp, dataField

I need to compute some custom aggregations using a UDAF while grouping by 
userId, clientId over a sliding window (10 mins, triggered every 1 min). My 
first attempt is:
SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as 
custom_aggregated
FROM my_kafka_stream_table
GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' 
HOUR)

This query works as I expect it to. In every time window I end up with inserts 
for unique userId + clientId combinations. What I want to do though, is 
generate a single row per userId in each time window and this is what I’m 
struggling with expressing along with the restriction that I want to sink this 
to an AppendStreamTableSink. I was hoping to do something like this:

SELECT userId, COLLECT(client_custom_aggregated)
FROM
(
  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, 
dataField) as custom_aggregated] as client_custom_aggregated
  FROM my_kafka_stream_table
  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' 
HOUR)
) GROUP BY userId

Unfortunately when I try this (and a few other variants), I run into the error, 
“AppendStreamTableSink requires that Table has only insert changes”. Does 
anyone know if there’s a way for me to compute my collect aggregation to 
produce one row per userId for a given time window?

Thanks,

-- Piyush



Re: Backoff strategies for async IO functions?

2019-03-12 Thread Konstantin Knauf
Hi Shuyi,

I am not sure. You could handle retries in the user code within
org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke
without using a DLQ as described in my original answer to William.  On the
other hand, I agree that it could easier for the user and it is indeed a
common scenario.

Two follow up questions come to mind:

   - When a Flink job fails and restarts, would you expect the "retry
   counter" to be reset or to continue where it left off?
   - With AsyncStream.orderedWait() when would you expect a record to be
   skipped? After the final timeout, after the first timeout?

Would you like to create a JIRA ticket [1] for this improvement with
answers to the questions above and we can continue to discuss it there.

Best,

Konstantin

[1]
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues


On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen  wrote:

> Hi Konstantin,
>
> (cc Till since he owns the code)
>
> For async-IO, IO failure and retry is a common & expected pattern. In most
> of the use cases, users will need to deal with IO failure and retry.
> Therefore, I think it's better to address the problem in Flink rather than
> user implementing its custom logic in user code for a better dev
> experience. We do have similar problem in many of our use cases. To enable
> backoff and retry, we need to put the failed message to a DLQ (another
> Kafka topic) and re-ingest the message from the DLQ topic to retry, which
> is manual/cumbersome and require setting up extra Kafka topic.
>
> Can we add multiple strategies to handle async IO failure in the
> AsyncWaitOperator? I propose the following strategies:
>
>
>- FAIL_OPERATOR (default & current behavior)
>- FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N
>times)
>- EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
>
> What do you guys think? Thanks a lot.
>
> Shuyi
>
> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf 
> wrote:
>
>> Hi William,
>>
>> the AsyncOperator does not have such a setting. It is "merely" a wrapper
>> around an asynchronous call, which provides integration with Flink's state
>> & time management.
>>
>> I think, the way to go would be to do the exponential back-off in the
>> user code and set the timeout of the AsyncOperator to the sum of the
>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s).
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>> On Thu, Mar 7, 2019 at 5:20 PM William Saar  wrote:
>>
>>> Hi,
>>> Is there a way to specify an exponential backoff strategy for when async
>>> function calls fail?
>>>
>>> I have an async function that does web requests to a rate-limited API.
>>> Can you handle that with settings on the async function call?
>>>
>>> Thanks,
>>> William
>>>
>>>
>>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: What does "Continuous incremental cleanup" mean in Flink 1.8 release notes

2019-03-12 Thread Konstantin Knauf
Hi Tony,

yes, when taking a savepoint  the same strategy as the during a
non-incremental checkpoint is used.

Best,

Konstantin

On Mon, Mar 11, 2019 at 2:29 AM Tony Wei  wrote:

> Hi Konstantin,
>
> That is really helpful. Thanks.
>
> Another follow-up question: The document said "Cleanup in full
> snapshot" is not applicable for
> the incremental checkpointing in the RocksDB state backend. However, when
> user manually
> trigger a savepoint and restart job from it, the expired states should be
> clean up as well based
> on Flink 1.6's implementation. Am I right?
>
> Best,
> Tony Wei
>
> Konstantin Knauf  於 2019年3月9日 週六 上午7:00寫道:
>
>> Hi Tony,
>>
>> before Flink 1.8 expired state is only cleaned up, when you try to access
>> it after expiration, i.e. when user code tries to access the expired state,
>> the state value is cleaned and "null" is returned. There was also already
>> the option to clean up expired state during full snapshots (
>> https://github.com/apache/flink/pull/6460). With Flink 1.8 expired state
>> is cleaned up continuously in the background regardless of checkpointing or
>> any attempt to access it after expiration.
>>
>> As a reference the linked JIRA tickets should be a good starting point.
>>
>> Hope this helps.
>>
>> Konstantin
>>
>>
>>
>>
>> On Fri, Mar 8, 2019 at 10:45 AM Tony Wei  wrote:
>>
>>> Hi everyone,
>>>
>>> I read the Flink 1.8 release notes about state [1], and it said
>>>
>>> *Continuous incremental cleanup of old Keyed State with TTL*
 We introduced TTL (time-to-live) for Keyed state in Flink 1.6 (
 FLINK-9510 ). This
 feature allowed to clean up and make inaccessible keyed state entries when
 accessing them. In addition state would now also being cleaned up when
 writing a savepoint/checkpoint.
 Flink 1.8 introduces continous cleanup of old entries for both the
 RocksDB state backend (FLINK-10471
 ) and the heap
 state backend (FLINK-10473
 ). This means that
 old entries (according to the ttl setting) are continously being cleanup 
 up.
>>>
>>>
>>> I'm not familiar with TTL's implementation in Flink 1.6 and what new
>>> features introduced in Flink
>>> 1.8. I don't understand what difference between these two release
>>> version after reading the
>>> release notes. Did they change the outcome of TTL feature, or provide
>>> new TTL features, or just
>>> change the behavior of executing TTL mechanism.
>>>
>>> Could you give me more references to learn about it? A simple example
>>> to illustrate it is more
>>> appreciated. Thank you.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/release-notes/flink-1.8.html#state
>>>
>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vijay Bhaskar
Yes Its supposed to work.  But unfortunately it was not working. Flink
community needs to respond to this behavior.

Regards
Bhaskar

On Tue, Mar 12, 2019 at 3:45 PM Vishal Santoshi 
wrote:

> Aah.
> Let me try this out and will get back to you.
> Though I would assume that save point with cancel is a single atomic step,
> rather then a save point *followed*  by a cancellation ( else why would
> that be an option ).
> Thanks again.
>
>
> On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar 
> wrote:
>
>> Hi Vishal,
>>
>> yarn-cancel doesn't mean to be for yarn cluster. It works for all
>> clusters. Its recommended command.
>>
>> Use the following command to issue save point.
>>  curl  --header "Content-Type: application/json" --request POST --data
>> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":false}'
>> \ https://
>> .ingress.***/jobs//savepoints
>>
>> Then issue yarn-cancel.
>> After that  follow the process to restore save point
>>
>> Regards
>> Bhaskar
>>
>> On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hello Vijay,
>>>
>>>Thank you for the reply. This though is k8s deployment (
>>> rather then yarn ) but may be they follow the same lifecycle.  I issue a*
>>> save point with cancel*  as documented here
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
>>> a straight up
>>>  curl  --header "Content-Type: application/json" --request POST --data
>>> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":true}'
>>> \ https://
>>> .ingress.***/jobs//savepoints
>>>
>>> I would assume that after taking the save point, the jvm should exit,
>>> after all the k8s deployment is of kind: job and if it is a job cluster
>>> then a cancellation should exit the jvm and hence the pod. It does seem to
>>> do some things right. It stops a bunch of stuff ( the JobMaster, the
>>> slotPol, zookeeper coordinator etc ) . It also remove the checkpoint
>>> counter but does not exit  the job. And after a little bit the job is
>>> restarted which does not make sense and absolutely not the right thing to
>>> do  ( to me at least ).
>>>
>>> Further if I delete the deployment and the job from k8s and restart the
>>> job and deployment fromSavePoint, it refuses to honor the fromSavePoint. I
>>> have to delete the zk chroot for it to consider the save point.
>>>
>>>
>>> Thus the process of cancelling and resuming from a SP on a k8s job
>>> cluster deployment  seems to be
>>>
>>>- cancel with save point as defined hre
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints
>>>- delete the job manger job  and task manager deployments from k8s
>>>almost immediately.
>>>- clear the ZK chroot for the 000.. job  and may be the
>>>checkpoints directory.
>>>- resumeFromCheckPoint
>>>
>>> If some body can say that this indeed is the process ?
>>>
>>>
>>>
>>>  Logs are attached.
>>>
>>>
>>>
>>> 2019-03-12 08:10:43,871 INFO
>>> org.apache.flink.runtime.jobmaster.JobMaster  -
>>> Savepoint stored in
>>> hdfs://*:8020/tmp/xyz3/savepoint-00-6d5bdc9b53ae. Now
>>> cancelling .
>>>
>>> 2019-03-12 08:10:43,871 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> anomaly_echo () switched from state RUNNING
>>> to CANCELLING.
>>>
>>> 2019-03-12 08:10:44,227 INFO  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Completed checkpoint 10 for job 
>>> (7238 bytes in 311 ms).
>>>
>>> 2019-03-12 08:10:44,232 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>>> Barnacle Anomalies Kafka topic -> Map -> Sink: Logging Sink (1/1)
>>> (e2d02ca40a9a6c96a0c1882f5a2e4dd6) switched from RUNNING to CANCELING.
>>>
>>> 2019-03-12 08:10:44,274 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>>> Barnacle Anomalies Kafka topic -> Map -> Sink: Logging Sink (1/1)
>>> (e2d02ca40a9a6c96a0c1882f5a2e4dd6) switched from CANCELING to CANCELED.
>>>
>>> 2019-03-12 08:10:44,276 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> anomaly_echo () switched from state
>>> CANCELLING to CANCELED.
>>>
>>> 2019-03-12 08:10:44,276 INFO  
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>>> - Stopping checkpoint coordinator for job
>>> .
>>>
>>> 2019-03-12 08:10:44,277 INFO
>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
>>> Shutting down
>>>
>>> 2019-03-12 08:10:44,323 INFO  
>>> org.apache.flink.runtime.checkpoint.CompletedCheckpoint
>>>   - Checkpoint with ID 8 at
>>> 

Re: Random forest - Flink ML

2019-03-12 Thread Benoît Paris
There has been some developments at  Apache SAMOA
   for a forest of decision trees.

This is not regular Random Forest, but a form of trees that can be
incrementally learned fast. If I recall correctly they also have adaptive
algorithms as well. Here are some resources:

*  VHT: Vertical Hoeffding Tree   

*  Apache SAMOA   

Now I don't know the status of the project nor have I tried them, nor have I
ever tried SAMOA; but this is something that could fit your needs.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vishal Santoshi
Aah.
Let me try this out and will get back to you.
Though I would assume that save point with cancel is a single atomic step,
rather then a save point *followed*  by a cancellation ( else why would
that be an option ).
Thanks again.


On Tue, Mar 12, 2019 at 4:50 AM Vijay Bhaskar 
wrote:

> Hi Vishal,
>
> yarn-cancel doesn't mean to be for yarn cluster. It works for all
> clusters. Its recommended command.
>
> Use the following command to issue save point.
>  curl  --header "Content-Type: application/json" --request POST --data
> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":false}'
> \ https://
> .ingress.***/jobs//savepoints
>
> Then issue yarn-cancel.
> After that  follow the process to restore save point
>
> Regards
> Bhaskar
>
> On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi 
> wrote:
>
>> Hello Vijay,
>>
>>Thank you for the reply. This though is k8s deployment (
>> rather then yarn ) but may be they follow the same lifecycle.  I issue a*
>> save point with cancel*  as documented here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
>> a straight up
>>  curl  --header "Content-Type: application/json" --request POST --data
>> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":true}'
>> \ https://
>> .ingress.***/jobs//savepoints
>>
>> I would assume that after taking the save point, the jvm should exit,
>> after all the k8s deployment is of kind: job and if it is a job cluster
>> then a cancellation should exit the jvm and hence the pod. It does seem to
>> do some things right. It stops a bunch of stuff ( the JobMaster, the
>> slotPol, zookeeper coordinator etc ) . It also remove the checkpoint
>> counter but does not exit  the job. And after a little bit the job is
>> restarted which does not make sense and absolutely not the right thing to
>> do  ( to me at least ).
>>
>> Further if I delete the deployment and the job from k8s and restart the
>> job and deployment fromSavePoint, it refuses to honor the fromSavePoint. I
>> have to delete the zk chroot for it to consider the save point.
>>
>>
>> Thus the process of cancelling and resuming from a SP on a k8s job
>> cluster deployment  seems to be
>>
>>- cancel with save point as defined hre
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints
>>- delete the job manger job  and task manager deployments from k8s
>>almost immediately.
>>- clear the ZK chroot for the 000.. job  and may be the
>>checkpoints directory.
>>- resumeFromCheckPoint
>>
>> If some body can say that this indeed is the process ?
>>
>>
>>
>>  Logs are attached.
>>
>>
>>
>> 2019-03-12 08:10:43,871 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster  -
>> Savepoint stored in
>> hdfs://*:8020/tmp/xyz3/savepoint-00-6d5bdc9b53ae. Now
>> cancelling .
>>
>> 2019-03-12 08:10:43,871 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> anomaly_echo () switched from state RUNNING
>> to CANCELLING.
>>
>> 2019-03-12 08:10:44,227 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Completed checkpoint 10 for job 
>> (7238 bytes in 311 ms).
>>
>> 2019-03-12 08:10:44,232 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>> Barnacle Anomalies Kafka topic -> Map -> Sink: Logging Sink (1/1)
>> (e2d02ca40a9a6c96a0c1882f5a2e4dd6) switched from RUNNING to CANCELING.
>>
>> 2019-03-12 08:10:44,274 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>> Barnacle Anomalies Kafka topic -> Map -> Sink: Logging Sink (1/1)
>> (e2d02ca40a9a6c96a0c1882f5a2e4dd6) switched from CANCELING to CANCELED.
>>
>> 2019-03-12 08:10:44,276 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> anomaly_echo () switched from state
>> CANCELLING to CANCELED.
>>
>> 2019-03-12 08:10:44,276 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>> - Stopping checkpoint coordinator for job
>> .
>>
>> 2019-03-12 08:10:44,277 INFO
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
>> Shutting down
>>
>> 2019-03-12 08:10:44,323 INFO  
>> org.apache.flink.runtime.checkpoint.CompletedCheckpoint
>>   - Checkpoint with ID 8 at
>> 'hdfs://nn-crunchy:8020/tmp/xyz2/savepoint-00-859e626cbb00' not
>> discarded.
>>
>> 2019-03-12 08:10:44,437 INFO
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
>> Removing
>> /k8s_anomalyecho/k8s_anomalyecho/checkpoints/
>> from ZooKeeper
>>
>> 2019-03-12 08:10:44,437 INFO  
>> 

Re: Random forest - Flink ML

2019-03-12 Thread Avi Levi
Thanks Flavio,
I will definitely check it out. But from a quick glance , it seems that it
is missing implementation of "random forest" which is something that we are
looking for .
If anyone can recommend/suggest/share that will be greatly appreciated.

Best Regards
Avi


On Mon, Mar 11, 2019 at 10:01 PM Flavio Pompermaier 
wrote:

> I know there's an outgoing promising effort on improving Flink ML in the
> Streamline project [1] but I don't know why it's not very
> considered/advertised.
>
> Best,
> Flavio
>
> [1] https://h2020-streamline-project.eu/apache-flink/
>
> Il Lun 11 Mar 2019, 15:40 Avi Levi  ha scritto:
>
>> HI ,
>>  According to Tills comment
>> 
>> I understand that flink-ml is going to be ditched. What will be the
>> alternative ?
>> Looking for a "random forest" method that we can add to our pipeline
>> (scala). any suggestions?
>>
>> Thanks
>> Avi
>>
>>
>>
>>


Re: Set partition number of Flink DataSet

2019-03-12 Thread qi luo
Hi Ken,

Thanks for your reply. I may not make myself clear: our problem is not about 
reading but rather writing. 

We need to write to N files based on key partitioning. We have to use 
setParallelism() to set the output partition/file number, but when the 
partition number is too large (~100K), the parallelism would be too high. Is 
there any other way to achieve this?

Thanks,
Qi

> On Mar 11, 2019, at 11:22 PM, Ken Krugler  wrote:
> 
> Hi Qi,
> 
> I’m guessing you’re calling createInput() for each input file.
> 
> If so, then instead you want to do something like:
> 
>   Job job = Job.getInstance();
> 
>   for each file…
>   FileInputFormat.addInputPath(job, new 
> org.apache.hadoop.fs.Path(file path));
> 
>   env.createInput(HadoopInputs.createHadoopInput(…, job)
> 
> Flink/Hadoop will take care of parallelizing the reads from the files, given 
> the parallelism that you’re specifying.
> 
> — Ken
> 
> 
>> On Mar 11, 2019, at 5:42 AM, qi luo > > wrote:
>> 
>> Hi,
>> 
>> We’re trying to distribute batch input data to (N) HDFS files partitioning 
>> by hash using DataSet API. What I’m doing is like:
>> 
>> env.createInput(…)
>>   .partitionByHash(0)
>>   .setParallelism(N)
>>   .output(…)
>> 
>> This works well for small number of files. But when we need to distribute to 
>> large number of files (say 100K), the parallelism becomes too large and we 
>> could not afford that many TMs.
>> 
>> In spark we can write something like ‘rdd.partitionBy(N)’ and control the 
>> parallelism separately (using dynamic allocation). Is there anything similar 
>> in Flink or other way we can achieve similar result? Thank you!
>> 
>> Qi
> 
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com 
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 



Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vijay Bhaskar
Hi Vishal,

yarn-cancel doesn't mean to be for yarn cluster. It works for all clusters.
Its recommended command.

Use the following command to issue save point.
 curl  --header "Content-Type: application/json" --request POST --data
'{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":false}'
\ https://
.ingress.***/jobs//savepoints

Then issue yarn-cancel.
After that  follow the process to restore save point

Regards
Bhaskar

On Tue, Mar 12, 2019 at 2:11 PM Vishal Santoshi 
wrote:

> Hello Vijay,
>
>Thank you for the reply. This though is k8s deployment (
> rather then yarn ) but may be they follow the same lifecycle.  I issue a*
> save point with cancel*  as documented here
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
> a straight up
>  curl  --header "Content-Type: application/json" --request POST --data
> '{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":true}'
> \ https://
> .ingress.***/jobs//savepoints
>
> I would assume that after taking the save point, the jvm should exit,
> after all the k8s deployment is of kind: job and if it is a job cluster
> then a cancellation should exit the jvm and hence the pod. It does seem to
> do some things right. It stops a bunch of stuff ( the JobMaster, the
> slotPol, zookeeper coordinator etc ) . It also remove the checkpoint
> counter but does not exit  the job. And after a little bit the job is
> restarted which does not make sense and absolutely not the right thing to
> do  ( to me at least ).
>
> Further if I delete the deployment and the job from k8s and restart the
> job and deployment fromSavePoint, it refuses to honor the fromSavePoint. I
> have to delete the zk chroot for it to consider the save point.
>
>
> Thus the process of cancelling and resuming from a SP on a k8s job cluster
> deployment  seems to be
>
>- cancel with save point as defined hre
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints
>- delete the job manger job  and task manager deployments from k8s
>almost immediately.
>- clear the ZK chroot for the 000.. job  and may be the
>checkpoints directory.
>- resumeFromCheckPoint
>
> If some body can say that this indeed is the process ?
>
>
>
>  Logs are attached.
>
>
>
> 2019-03-12 08:10:43,871 INFO  org.apache.flink.runtime.jobmaster.JobMaster
> - Savepoint stored in
> hdfs://*:8020/tmp/xyz3/savepoint-00-6d5bdc9b53ae. Now
> cancelling .
>
> 2019-03-12 08:10:43,871 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> anomaly_echo () switched from state RUNNING
> to CANCELLING.
>
> 2019-03-12 08:10:44,227 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 10 for job 
> (7238 bytes in 311 ms).
>
> 2019-03-12 08:10:44,232 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Barnacle Anomalies Kafka topic -> Map -> Sink: Logging Sink (1/1)
> (e2d02ca40a9a6c96a0c1882f5a2e4dd6) switched from RUNNING to CANCELING.
>
> 2019-03-12 08:10:44,274 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Barnacle Anomalies Kafka topic -> Map -> Sink: Logging Sink (1/1)
> (e2d02ca40a9a6c96a0c1882f5a2e4dd6) switched from CANCELING to CANCELED.
>
> 2019-03-12 08:10:44,276 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> anomaly_echo () switched from state
> CANCELLING to CANCELED.
>
> 2019-03-12 08:10:44,276 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Stopping checkpoint coordinator for job
> .
>
> 2019-03-12 08:10:44,277 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Shutting down
>
> 2019-03-12 08:10:44,323 INFO  
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint
>   - Checkpoint with ID 8 at
> 'hdfs://nn-crunchy:8020/tmp/xyz2/savepoint-00-859e626cbb00' not
> discarded.
>
> 2019-03-12 08:10:44,437 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Removing
> /k8s_anomalyecho/k8s_anomalyecho/checkpoints/
> from ZooKeeper
>
> 2019-03-12 08:10:44,437 INFO  
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint
>   - Checkpoint with ID 10 at
> 'hdfs://*:8020/tmp/xyz3/savepoint-00-6d5bdc9b53ae' not
> discarded.
>
> 2019-03-12 08:10:44,447 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
> Shutting down.
>
> 2019-03-12 08:10:44,447 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
> Removing 

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vishal Santoshi
Hello Vijay,

   Thank you for the reply. This though is k8s deployment (
rather then yarn ) but may be they follow the same lifecycle.  I issue a*
save point with cancel*  as documented here
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints,
a straight up
 curl  --header "Content-Type: application/json" --request POST --data
'{"target-directory":"hdfs://*:8020/tmp/xyz1","cancel-job":true}'
\ https://
.ingress.***/jobs//savepoints

I would assume that after taking the save point, the jvm should exit, after
all the k8s deployment is of kind: job and if it is a job cluster then a
cancellation should exit the jvm and hence the pod. It does seem to do some
things right. It stops a bunch of stuff ( the JobMaster, the slotPol,
zookeeper coordinator etc ) . It also remove the checkpoint counter but
does not exit  the job. And after a little bit the job is restarted which
does not make sense and absolutely not the right thing to do  ( to me at
least ).

Further if I delete the deployment and the job from k8s and restart the job
and deployment fromSavePoint, it refuses to honor the fromSavePoint. I have
to delete the zk chroot for it to consider the save point.


Thus the process of cancelling and resuming from a SP on a k8s job cluster
deployment  seems to be

   - cancel with save point as defined hre
   
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints
   - delete the job manger job  and task manager deployments from k8s
   almost immediately.
   - clear the ZK chroot for the 000.. job  and may be the
   checkpoints directory.
   - resumeFromCheckPoint

If some body can say that this indeed is the process ?



 Logs are attached.



2019-03-12 08:10:43,871 INFO  org.apache.flink.runtime.jobmaster.JobMaster
- Savepoint stored in
hdfs://*:8020/tmp/xyz3/savepoint-00-6d5bdc9b53ae. Now
cancelling .

2019-03-12 08:10:43,871 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
anomaly_echo () switched from state RUNNING
to CANCELLING.

2019-03-12 08:10:44,227 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 10 for job 
(7238 bytes in 311 ms).

2019-03-12 08:10:44,232 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
Barnacle Anomalies Kafka topic -> Map -> Sink: Logging Sink (1/1)
(e2d02ca40a9a6c96a0c1882f5a2e4dd6) switched from RUNNING to CANCELING.

2019-03-12 08:10:44,274 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
Barnacle Anomalies Kafka topic -> Map -> Sink: Logging Sink (1/1)
(e2d02ca40a9a6c96a0c1882f5a2e4dd6) switched from CANCELING to CANCELED.

2019-03-12 08:10:44,276 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
anomaly_echo () switched from state
CANCELLING to CANCELED.

2019-03-12 08:10:44,276 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Stopping checkpoint coordinator for job
.

2019-03-12 08:10:44,277 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Shutting down

2019-03-12 08:10:44,323 INFO
org.apache.flink.runtime.checkpoint.CompletedCheckpoint
  - Checkpoint with ID 8 at
'hdfs://nn-crunchy:8020/tmp/xyz2/savepoint-00-859e626cbb00' not
discarded.

2019-03-12 08:10:44,437 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Removing
/k8s_anomalyecho/k8s_anomalyecho/checkpoints/
from ZooKeeper

2019-03-12 08:10:44,437 INFO
org.apache.flink.runtime.checkpoint.CompletedCheckpoint
  - Checkpoint with ID 10 at
'hdfs://*:8020/tmp/xyz3/savepoint-00-6d5bdc9b53ae' not
discarded.

2019-03-12 08:10:44,447 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
Shutting down.

2019-03-12 08:10:44,447 INFO
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter  -
Removing /checkpoint-counter/ from ZooKeeper

2019-03-12 08:10:44,463 INFO
org.apache.flink.runtime.dispatcher.MiniDispatcher- Job
 reached globally terminal state CANCELED.

2019-03-12 08:10:44,467 INFO  org.apache.flink.runtime.jobmaster.JobMaster
- Stopping the JobMaster for job
anomaly_echo().

2019-03-12 08:10:44,468 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
- Shutting StandaloneJobClusterEntryPoint down with application
status CANCELED. Diagnostics null.

2019-03-12 08:10:44,468 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  - Shutting
down rest endpoint.

2019-03-12 08:10:44,473 INFO

Could not resolve ResourceManager address on Flink 1.7.1

2019-03-12 Thread Le Xu
Hello:

I am trying to set up a standalone flink cluster (1.7.1) and I'm getting a
very similar error as the user reported in
this

thread. However, I believe the root cause should be different -- as I tried
start job manager using both start-cluster.sh and jobmanager.sh but both of
them failed with the same error.
The error I got is on task manager (flink-worker1) is similar to the
following:

6:6123/user/resourcemanager, retrying in 1 ms: Could not connect to rpc
endpoint under address akka.tcp://flink@10.0.0.6:6123/user/resourcemanager..
2019-03-12 07:39:42,884 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager, retrying in 1 ms: Could not
connect to rpc endpoint under address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager..
2019-03-12 07:39:52,901 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager, retrying in 1 ms: Could not
connect to rpc endpoint under address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager..
2019-03-12 07:40:02,925 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager, retrying in 1 ms: Could not
connect to rpc endpoint under address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager..
2019-03-12 07:40:12,939 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager, retrying in 1 ms: Could not
connect to rpc endpoint under address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager..
2019-03-12 07:40:22,963 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager, retrying in 1 ms: Could not
connect to rpc endpoint under address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager..
2019-03-12 07:40:32,978 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
resolve ResourceManager address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager, retrying in 1 ms: Could not
connect to rpc endpoint under address akka.tcp://
flink@10.0.0.6:6123/user/resourcemanager..


But the job manager seems to start up ok:

2019-03-12 07:38:36,643 INFO
akka.remote.Remoting  - Remoting
started; listening on addresses :[akka.tcp://flink@10.0.0.6:6123]
2019-03-12 07:38:36,659 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor
system started at akka.tcp://flink@10.0.0.6:6123
2019-03-12 07:38:36,690 INFO
org.apache.flink.runtime.blob.BlobServer  - Created
BLOB server storage directory
C:\cygwin64\tmp\blobStore-85b28100-fa08-4488-9f79-d0d712f34733
2019-03-12 07:38:36,690 INFO
org.apache.flink.runtime.blob.BlobServer  - Started
BLOB server at 0.0.0.0:54072 - max concurrent requests: 50 - max backlog:
1000
2019-03-12 07:38:36,705 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl   - No metrics
reporter configured, no metrics will be exposed/reported.
2019-03-12 07:38:36,721 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Trying to
start actor system at 10.0.0.6:0
2019-03-12 07:38:36,737 INFO
akka.event.slf4j.Slf4jLogger  - Slf4jLogger
started
2019-03-12 07:38:36,752 INFO
akka.remote.Remoting  - Starting
remoting
2019-03-12 07:38:36,768 INFO
akka.remote.Remoting  - Remoting
started; listening on addresses :[akka.tcp://flink-metrics@10.0.0.6:54085]
2019-03-12 07:38:36,768 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Actor
system started at akka.tcp://flink-metrics@10.0.0.6:54085
2019-03-12 07:38:36,784 INFO
org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  -
Initializing FileArchivedExecutionGraphStore: Storage directory
C:\cygwin64\tmp\executionGraphStore-550bff8d-314e-4a04-b10e-93bdc7af80c6,
expiration time 360, maximum cache size 52428800 bytes.
2019-03-12 07:38:36,815 INFO
org.apache.flink.runtime.blob.TransientBlobCache  - Created
BLOB cache storage directory
C:\cygwin64\tmp\blobStore-608a5134-9f0d-44dd-8e3d-d9fbe4185d21
2019-03-12 07:38:36,830 WARN
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- Upload
directory
C:\cygwin64\tmp\flink-web-2d9712e2-54cb-428a-a27a-826fa2214dad\flink-web-upload
does not exist, or has been deleted externally. Previously uploaded files
are no longer available.
2019-03-12 07:38:36,830 INFO

Re: K8s job cluster and cancel and resume from a save point ?

2019-03-12 Thread Vijay Bhaskar
Hi Vishal

Save point with cancellation internally use  /cancel  REST API. Which is
not stable API.  It always exits with 404. Best  way to issue is:

a) First issue save point REST API
b) Then issue  /yarn-cancel  rest API( As described in
http://mail-archives.apache.org/mod_mbox/flink-user/201804.mbox/%3c0ffa63f4-e6ed-42d8-1928-37a7adaaa...@apache.org%3E
)
c) Then After resuming your job, provide save point Path as argument for
the run jar REST API, which is returned by the (a)
Above is the smoother way

Regards
Bhaskar

On Tue, Mar 12, 2019 at 2:46 AM Vishal Santoshi 
wrote:

> There are some issues I see and would want to get some feedback
>
> 1. On Cancellation With SavePoint with a Target Directory , the k8s  job
> does not exit ( it is not a deployment ) . I would assume that on
> cancellation the jvm should exit, after cleanup etc, and thus the pod
> should too. That does not happen and thus the job pod remains live. Is that
> expected ?
>
> 2. To resume fro a save point it seems that I have to delete the job id (
> 00 )  from ZooKeeper ( this is HA ), else it defaults to the
> latest checkpoint no matter what
>
>
> I am kind of curious as to what in 1.7.2 is the tested  process of
> cancelling with a save point and resuming  and what is the cogent story
> around job id ( defaults to .. ). Note that --job-id does not
> work with 1.7.2 so even though that does not make sense, I still can not
> provide a new job id.
>
> Regards,
>
> Vishal.
>
>


Re: Expressing Flink array aggregation using Table / SQL API

2019-03-12 Thread Kurt Young
Hi Piyush,

Could you try to add clientId into your aggregate function, and to track
the map of  inside your new aggregate
function, and assemble what ever result when emit.
The SQL will looks like:

SELECT userId, some_aggregation(clientId, eventType, `timestamp`,
dataField)
FROM my_kafka_stream_table
GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

Kurt


On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang  wrote:

> Hi folks,
>
>
>
> I’m getting started with Flink and trying to figure out how to express
> aggregating some rows into an array to finally sink data into an
> AppendStreamTableSink.
>
> My data looks something like this:
>
> userId, clientId, eventType, timestamp, dataField
>
>
>
> I need to compute some custom aggregations using a UDAF while grouping by
> userId, clientId over a sliding window (10 mins, triggered every 1 min). My
> first attempt is:
>
> SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField)
> as custom_aggregated
>
> FROM my_kafka_stream_table
>
> GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL
> '1' HOUR)
>
>
>
> This query works as I expect it to. In every time window I end up with
> inserts for unique userId + clientId combinations. What I want to do
> though, is generate a single row per userId in each time window and this is
> what I’m struggling with expressing along with the restriction that I want
> to sink this to an AppendStreamTableSink. I was hoping to do something like
> this:
>
>
>
> SELECT userId, COLLECT(client_custom_aggregated)
>
> FROM
>
> (
>
>   SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`,
> dataField) as custom_aggregated] as client_custom_aggregated
>
>   FROM my_kafka_stream_table
>
>   GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE,
> INTERVAL '1' HOUR)
>
> ) GROUP BY userId
>
>
>
> Unfortunately when I try this (and a few other variants), I run into the
> error, “AppendStreamTableSink requires that Table has only insert changes”.
> Does anyone know if there’s a way for me to compute my collect aggregation
> to produce one row per userId for a given time window?
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>


Re: How to join stream and dimension data in Flink?

2019-03-12 Thread 徐涛
Hi Hequn,
I want to implement stream join dimension in Flink SQL, I found there 
is a new feature named Temporal Tables delivered by Flink1.7, I think it maybe 
could be used to achieve the join between stream and dimension table. But I am 
not sure about that. Could anyone help me about it? 
Thanks a lot for your help.

Best 
Henry

> 在 2018年9月26日,上午12:16,Hequn Cheng  写道:
> 
> Hi vino,
> 
> Thanks for sharing the link. It's a great book and I will take a look. 
> There are kinds of join. Different joins have different semantics. From the 
> link, I think it means the time versioned join.  FLINK-9712 
>  enrichments joins with 
> Time Versioned Functions and the result is deterministic under eventime.  
> 
> Best, Hequn
> 
> On Tue, Sep 25, 2018 at 11:05 PM vino yang  > wrote:
> Hi Hequn,
> 
> The specific content of the book does not give a right or wrong conclusion, 
> but it illustrates this phenomenon: two streams of the same input, playing 
> and joining at the same time, due to the order of events, the connection 
> results are uncertain. This is because the two streams are intertwined in 
> different forms. This has nothing to do with orderby, just that it exists in 
> the stream stream join. Of course, this phenomenon is only a comparison 
> statement with a non-stream join.
> 
> In addition, I recommend this book, which is very famous on Twitter and 
> Amazon. Because you are also Chinese, there is a good translation here. If I 
> guess it is correct, the main translator is also from your company. This part 
> of what I mentioned is here.[1]
> 
> [1]: 
> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>  
> 
> 
> Thanks, vino.
> 
> Hequn Cheng mailto:chenghe...@gmail.com>> 
> 于2018年9月25日周二 下午9:45写道:
> Hi vino,
> 
> There are no order problems of stream-stream join in Flink. No matter what 
> order the elements come, stream-stream join in Flink will output results 
> which consistent with standard SQL semantics. I haven't read the book you 
> mentioned. For join, it doesn't guarantee output orders. You have to do 
> orderBy if you want to get ordered results.
> 
> Best, Hequn
> 
> On Tue, Sep 25, 2018 at 8:36 PM vino yang  > wrote:
> Hi Fabian,
> 
> I may not have stated it here, and there is no semantic problem at the Flink 
> implementation level. Rather, there may be “Time-dependence” here. [1]
> 
> Yes, my initial answer was not to use this form of join in this scenario, but 
> Henry said he converted the table into a stream table and asked about the 
> feasibility of other methods.
> 
> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: 
> Derived Data, Chapter 11: Stream Processing , Stream Joins.
> 
> some content :
> If the ordering of events across streams is undetermined, the join becomes 
> nondeter‐ ministic [87], which means you cannot rerun the same job on the 
> same input and necessarily get the same result: the events on the input 
> streams may be interleaved in a different way when you run the job again. 
> 
> 
> 
> Fabian Hueske mailto:fhue...@gmail.com>> 于2018年9月25日周二 
> 下午8:08写道:
> Hi,
> 
> I don't think that using the current join implementation in the Table API / 
> SQL will work.
> The non-windowed join fully materializes *both* input tables in state. This 
> is necessary, because the join needs to be able to process updates on either 
> side.
> While this is not a problem for the fixed sized MySQL table, materializing 
> the append-only table (aka stream) is probably not what you want.
> You can also not limit idle state retention because it would remove the MySQL 
> table from state at some point.
> 
> The only way to make it work is using a user-defined TableFunction that 
> queries the MySQL table via JDBC. 
> However, please note that these calls would be synchronous, blocking calls.
> 
> @Vino: Why do you think that the stream & stream join is not mature and which 
> problems do you see in the semantics? 
> The semantics are correct (standard SQL semantics) and in my opinion the 
> implementation is also mature.
> However, you should not use the non-windowed join if any of the input tables 
> is ever growing because both sides must be hold in state. This is not an 
> issue of the semantics.
> 
> Cheers,
> Fabian
> 
> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang  >:
> Hi Henry,
> 
> 1) I don't recommend this method very much, but you said that you expect to 
> convert mysql table to stream and then to flink table. Under this premise, I 
> said that you can do this by joining two stream tables. But as you know, this 
> join depends on the time period in which the state is saved. To make it