How do I initialize the window state on first run?

2018-10-11 Thread bupt_ljy
Hi,
 I’m going to run a new Flink program with some initialized window states.
 I can’t see there is an official way to do this, right? I’ve tried the bravo 
project, but it doesn’t support FsStateBackend and it costs too much work if we 
add a new StateBackend in it.
 Any good ideas about this?






Jiayi Liao,Best

Re: No data issued by flink window after a few hours

2018-10-11 Thread vino yang
Hi gongsen,

Since you are running well locally, it should not be a configuration issue.
You can refer to the Flink UI to see if your checkpoint is delayed.
I hope that you can follow the instructions in the documentation[1] and
provide some screenshots that will help the community help locate the
problem.

Thanks, vino.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/debugging_event_time.html#monitoring-current-event-time

潘 功森  于2018年10月12日周五 上午11:10写道:

> Please look at the mail below,  the others were out of data cause the bad
> network.
>
>
>
> Yours,
>
> September
>
>
> --
> *发件人:* 潘 功森 
> *发送时间:* Friday, October 12, 2018 11:05:49 AM
> *收件人:* vino yang
> *抄送:* user
> *主题:* 答复: No data issued by flink window after a few hours
>
>
> Hi,
>
> I found the pictures maybe too big and the net here not so good, so the
> mail I wrote is not sent sucsessfully last night.
>
> Yes, I used event time.
>
> I found watermarks fired normally when the job started, but it stopped and
> no changed after running hours.
>
> And I changed as fs state backend, I configured at flink-conf.yaml below:
>
> State.backend: filesystem
>
> State.backend.fs.checkpointdir: file:///xxx/checkpoints
>
>
>
> I found there are many checkpoints saved in the file.
>
> But the watermarks also will be stoppped.
>
>
>
> And there’re two enviroments.
>
> The local running well, and the problem occurred on site.
>
> There’re two differences:
>
> 1. Data amount very small in local and very huge on site.
>
> 2. On site configurations:
>
> Akka.ask.timeout: 2min
>
> Akka.lookup.timeout: 2min
>
> Akka.client.timeout: 3min
>
> Akka.log.lifecycle.events: on
>
> Akka.tcp.timeout: 60s
>
>
>
> But not configured at local.
>
> So there’re any influences?
>
> Please help me…
>
>
>
>
>
> Yours,
>
> September
>
>
>
> *发件人: *vino yang 
> *发送时间: *2018年10月11日 14:56
> *收件人: *pangong...@hotmail.com
> *抄送: *user 
> *主题: *Re: No data issued by flink window after a few hours
>
>
>
> Hi gongsen,
>
>
>
> Have you used event time as time semantics? If so, then the possible
> problem is related to watermark.
>
> Since I don't know the details of your program, it's hard to make a
> conclusion. You can check if your watermark is firing normally.
>
>
>
> Thanks, vino.
>
>
>
> 潘 功森  于2018年10月11日周四 下午12:12写道:
>
> Hi,
>
> I changed as below configurations,and it looks fine when job started.
>
> But there’re no results issued when window ends after running about six
> hours, and no errors and exceptions.
>
> How can I position the question?
>
>
>
> Yours,
>
> September
>
>
>
> *发件人**:* 潘 功森 
> *发送时间**:* Wednesday, October 10, 2018 2:44:48 PM
> *收件人**:* vino yang
> *抄送**:* user
> *主题**:* 答复: No data issued by flink window after a few hours
>
>
>
> Hi,
>
>
>
> Cause default state size in one hour is too small,and the max window size
> is 24 hours, so I used 500M.
>
>
>
> MemoryStateBackend stateBackend = new 
> MemoryStateBackend(*MAX_STATE_SIZE*);//500M
> env.setStateBackend(stateBackend);
>
>
>
> And I found Irrespective of the configured maximal state size, the state
> cannot be larger than the akka frame size.
>
> So I add a config in flink-comf.yaml:
>
> akka.framesize: 524288000b
>
>
>
> What else do I have to pay attention to?
>
>
>
> Yours,
>
> September
>
>
>
> *发件人**:* vino yang 
> *发送时间**:* Wednesday, October 10, 2018 11:45:31 AM
> *收件人**:* pangong...@hotmail.com
> *抄送**:* user
> *主题**:* Re: No data issued by flink window after a few hours
>
>
>
> Hi,
>
>
>
> I saw the exception image you provided. Based on the exception message, it
> seems you used the default max state size (5MB).
>
>
>
> You can specify the max state size to override the default value. Try :
>
>
>
> *MemoryStateBackend stateBackend = new MemoryStateBackend(theSizeOfBytes);*
>
>
>
> Please note that you need to reserve enough memory for Flink.
>
>
>
> Thanks, vino.
>
>
>
> 潘 功森  于2018年10月10日周三 上午11:36写道:
>
> Please have a look about my last mail.
>
>
>
> When the cached window data is too large, how?
>
>
>
> Yours,
>
> September
>
>
>
> *发件人**:* vino yang 
> *发送时间**:* Wednesday, October 10, 2018 11:33:48 AM
> *收件人**:* pangong...@hotmail.com
> *抄送**:* user
> *主题**:* Re: No data issued by flink window after a few hours
>
>
>
> Hi,
>
>
>
> Did you mean "computer momery" referring to Memory Statebackend?
>
> The Flink window mechanism is internally based on State, and this is done
> for fault tolerance.
>
> If you introduce external storage, it will break its design and bring
> other problems.
>
>
>
> Thanks, vino.
>
>
>
> 潘 功森  于2018年10月10日周三 上午11:02写道:
>
> Hi,
>
> "ram to cache the distinct data about sliding window" means I used
> computer momery not the third part db to cache the data need used in window.
>
> “the data need used in window” means :such as the sliding window is 1
> hour, and I need to count the distinct users, I need to cache the user id
> about one hour.
>
> Cause there’re no related errors.
>
> Yours,
>
> September
>
>

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Zhang, Xuefu
Hi Taher,

Thank you for your input. I think you emphasized two important points:

1. Hive metastore could be used for storing Flink metadata
2. There are some usability issues around Flink SQL configuration

I think we all agree on #1. #2 may be well true and the usability should be 
improved. However, I'm afraid that this is orthogonal to Hive integration and 
the proposed solution might be just one of the possible solutions. On the 
surface, the extensions you proposed seem going beyond the syntax and semantics 
of SQL language in general.

I don't disagree on the value of your proposal. I guess it's better to solve #1 
first and leave #2 for follow-up discussions. How does this sound to you?

Thanks,
Xuefu


--
Sender:Taher Koitawala 
Sent at:2018 Oct 12 (Fri) 10:06
Recipient:Xuefu 
Cc:Rong Rong ; Timo Walther ; dev 
; jornfranke ; vino yang 
; Fabian Hueske ; user 

Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

One other thought on the same lines was to use hive tables to store kafka 
information to process streaming tables. Something like 

"create table streaming_table (
bootstrapServers string,
topic string, keySerialiser string, ValueSerialiser string)"

Insert into streaming_table 
values(,"10.17.1.1:9092,10.17.2.2:9092,10.17.3.3:9092", "KafkaTopicName", 
"SimpleStringSchema", "SimpleSchemaString");

Create table processingtable(
//Enter fields here which match the kafka records schema);

Now we make a custom clause called something like "using"

The way we use this is:

Using streaming_table as configuration select count(*) from processingtable as 
streaming;


This way users can now pass Flink SQL info easily and get rid of the Flink SQL 
configuration file all together. This is simple and easy to understand and I 
think most users would follow this.

Thanks, 
Taher Koitawala 
On Fri 12 Oct, 2018, 7:24 AM Taher Koitawala,  wrote:

I think integrating Flink with Hive would be an amazing option and also to get 
Flink's SQL up to pace would be amazing. 

Current Flink Sql syntax to prepare and process a table is too verbose, users 
manually need to retype table definitions and that's a pain. Hive metastore 
integration should be done through, many users are okay defining their table 
schemas in Hive as it is easy to main, change or even migrate. 

Also we could simply choosing batch and stream there with simply something like 
a "process as" clause. 

select count(*) from flink_mailing_list process as stream;

select count(*) from flink_mailing_list process as batch;

This way we could completely get rid of Flink SQL configuration files. 

Thanks,
Taher Koitawala 

Integrating 
On Fri 12 Oct, 2018, 2:35 AM Zhang, Xuefu,  wrote:
Hi Rong,

Thanks for your feedback. Some of my earlier comments might have addressed some 
of your points, so here I'd like to cover some specifics.

1. Yes, I expect that table stats stored in Hive will be used in Flink plan 
optimization, but it's not part of compatibility concern (yet).
2. Both implementing Hive UDFs in Flink natively and making Hive UDFs work in 
Flink are considered.
3. I am aware of FLIP-24, but here the proposal is to make remote server 
compatible with HiveServer2. They are not mutually exclusive either.
4. The JDBC/ODBC driver in question is for the remote server that Flink 
provides. It's usually the servicer owner who provides drivers to their 
services. We weren't talking about JDBC/ODBC driver to external DB systems.

Let me know if you have further questions.

Thanks,
Xuefu

--
Sender:Rong Rong 
Sent at:2018 Oct 12 (Fri) 01:52
Recipient:Timo Walther 
Cc:dev ; jornfranke ; Xuefu 
; vino yang ; Fabian Hueske 
; user 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Xuefu, 

Thanks for putting together the overview. I would like to add some more on top 
of Timo's comments.
1,2. I agree with Timo that a proper catalog support should also address the 
metadata compatibility issues. I was actually wondering if you are referring to 
something like utilizing table stats for plan optimization?
4. If the key is to have users integrate Hive UDF without code changes to Flink 
UDF, it shouldn't be a problem as Timo mentioned. Is your concern mostly on the 
support of Hive UDFs that should be implemented in Flink-table natively?
7,8. Correct me if I am wrong, but I feel like some of the related components 
might have already been discussed in the longer term road map of FLIP-24 [1]?
9. per Jorn's comment to stay clear from a tight dependency on Hive and treat 
it as one "connector" system. Should we also consider treating JDBC/ODBC driver 
as part of the component from the connector system instead of having Flink to 
provide them?

Thanks,
Rong

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
On Thu, Oct 11, 2018 at 12:46 AM Timo Walther  wrote:
Hi Xuefu,

 tha

Re: When does Trigger.clear() get called?

2018-10-11 Thread Hequn Cheng
Hi Andrew,

Do you use CountWindow? You can switch to TimeWindow to have a test.
I'm not quite familiar with window. I checked the code and found that
clear() is called only when timer is triggered, i.e, called at the end of
time window.
Hope this helps.

Best, Hequn

On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks  wrote:

> Hello,
>
> I see that the clear() function is implemented for various types of
> Triggers in the Flink API. For example:
>
> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
>
> I am working on a custom Trigger for my application and have implemented
> clear() in a similar way.
>
> However, having put a breakpoint in this function it doesn’t seem to get
> called when I expect. The source code says that is called "when a window is
> purged”[1] but when my Trigger emits a PURGE this function never seems to
> get called. I am on Flink 1.3.
>
> Hoping someone can shed more light on the purpose of clear() and how/when
> it gets called
>
> Thanks!
> Andrew
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
>
>


答复: No data issued by flink window after a few hours

2018-10-11 Thread 潘 功森
Hi,
I found the pictures maybe too big and the net here not so good, so the mail I 
wrote is not sent sucsessfully last night.
Yes, I used event time.
I found watermarks fired normally when the job started, but it stopped and no 
changed after running hours.
And I changed as fs state backend, I configured at flink-conf.yaml below:
State.backend: filesystem
State.backend.fs.checkpointdir: file:///xxx/checkpoints

I found there are many checkpoints saved in the file.
But the watermarks also will be stoppped.

And there’re two enviroments.
The local running well, and the problem occurred on site.
There’re two differences:

1. Data amount very small in local and very huge on site.

2. On site configurations:

Akka.ask.timeout: 2min

Akka.lookup.timeout: 2min

Akka.client.timeout: 3min

Akka.log.lifecycle.events: on

Akka.tcp.timeout: 60s



But not configured at local.

So there’re any influences?

Please help me…


Yours,
September

发件人: vino yang
发送时间: 2018年10月11日 14:56
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi gongsen,

Have you used event time as time semantics? If so, then the possible problem is 
related to watermark.
Since I don't know the details of your program, it's hard to make a conclusion. 
You can check if your watermark is firing normally.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月11日周四 
下午12:12写道:
Hi,
I changed as below configurations,and it looks fine when job started.
But there’re no results issued when window ends after running about six hours, 
and no errors and exceptions.
How can I position the question?

Yours,
September


发件人: 潘 功森 mailto:pangong...@hotmail.com>>
发送时间: Wednesday, October 10, 2018 2:44:48 PM
收件人: vino yang
抄送: user
主题: 答复: No data issued by flink window after a few hours

Hi,

Cause default state size in one hour is too small,and the max window size is 24 
hours, so I used 500M.


MemoryStateBackend stateBackend = new MemoryStateBackend(MAX_STATE_SIZE);//500M
env.setStateBackend(stateBackend);

And I found Irrespective of the configured maximal state size, the state cannot 
be larger than the akka frame size.
So I add a config in flink-comf.yaml:
akka.framesize: 524288000b

What else do I have to pay attention to?

Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 11:45:31 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

I saw the exception image you provided. Based on the exception message, it 
seems you used the default max state size (5MB).

You can specify the max state size to override the default value. Try :

MemoryStateBackend stateBackend = new MemoryStateBackend(theSizeOfBytes);

Please note that you need to reserve enough memory for Flink.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:36写道:
Please have a look about my last mail.

When the cached window data is too large, how?

Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 11:33:48 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Did you mean "computer momery" referring to Memory Statebackend?
The Flink window mechanism is internally based on State, and this is done for 
fault tolerance.
If you introduce external storage, it will break its design and bring other 
problems.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:02写道:
Hi,
"ram to cache the distinct data about sliding window" means I used computer 
momery not the third part db to cache the data need used in window.
“the data need used in window” means :such as the sliding window is 1 hour, and 
I need to count the distinct users, I need to cache the user id about one hour.
Cause there’re no related errors.
Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 10:49:43 AM
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Can you explain what "ram to cache the distinct data about sliding window" mean?
The information you provide is too small and will not help others to help you 
analyze the problem and provide advice.

In addition, regarding the usage of Flink related issues, please only send mail 
to the user mailing list.
The dev mailing list is mainly used to discuss development related issues.

Thanks vino.

? ?? mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午10:37写道:
Hi all,
I used flink window, and when the job begins, we could get the results of 
windiow.But there’re no results issued after a few hours.
I found the job is still running and no errors, and the data not used 
window all can be issued.
By the way, I used Flink 1.3.2 and ram to cache the distinct data about 
sliding

答复: No data issued by flink window after a few hours

2018-10-11 Thread 潘 功森
Please look at the mail below,  the others were out of data cause the bad 
network.

Yours,
September


发件人: 潘 功森 
发送时间: Friday, October 12, 2018 11:05:49 AM
收件人: vino yang
抄送: user
主题: 答复: No data issued by flink window after a few hours

Hi,
I found the pictures maybe too big and the net here not so good, so the mail I 
wrote is not sent sucsessfully last night.
Yes, I used event time.
I found watermarks fired normally when the job started, but it stopped and no 
changed after running hours.
And I changed as fs state backend, I configured at flink-conf.yaml below:
State.backend: filesystem
State.backend.fs.checkpointdir: file:///xxx/checkpoints

I found there are many checkpoints saved in the file.
But the watermarks also will be stoppped.

And there’re two enviroments.
The local running well, and the problem occurred on site.
There’re two differences:

1. Data amount very small in local and very huge on site.

2. On site configurations:

Akka.ask.timeout: 2min

Akka.lookup.timeout: 2min

Akka.client.timeout: 3min

Akka.log.lifecycle.events: on

Akka.tcp.timeout: 60s



But not configured at local.

So there’re any influences?

Please help me…


Yours,
September

发件人: vino yang
发送时间: 2018年10月11日 14:56
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi gongsen,

Have you used event time as time semantics? If so, then the possible problem is 
related to watermark.
Since I don't know the details of your program, it's hard to make a conclusion. 
You can check if your watermark is firing normally.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月11日周四 
下午12:12写道:
Hi,
I changed as below configurations,and it looks fine when job started.
But there’re no results issued when window ends after running about six hours, 
and no errors and exceptions.
How can I position the question?

Yours,
September


发件人: 潘 功森 mailto:pangong...@hotmail.com>>
发送时间: Wednesday, October 10, 2018 2:44:48 PM
收件人: vino yang
抄送: user
主题: 答复: No data issued by flink window after a few hours

Hi,

Cause default state size in one hour is too small,and the max window size is 24 
hours, so I used 500M.


MemoryStateBackend stateBackend = new MemoryStateBackend(MAX_STATE_SIZE);//500M
env.setStateBackend(stateBackend);

And I found Irrespective of the configured maximal state size, the state cannot 
be larger than the akka frame size.
So I add a config in flink-comf.yaml:
akka.framesize: 524288000b

What else do I have to pay attention to?

Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 11:45:31 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

I saw the exception image you provided. Based on the exception message, it 
seems you used the default max state size (5MB).

You can specify the max state size to override the default value. Try :

MemoryStateBackend stateBackend = new MemoryStateBackend(theSizeOfBytes);

Please note that you need to reserve enough memory for Flink.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:36写道:
Please have a look about my last mail.

When the cached window data is too large, how?

Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 11:33:48 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Did you mean "computer momery" referring to Memory Statebackend?
The Flink window mechanism is internally based on State, and this is done for 
fault tolerance.
If you introduce external storage, it will break its design and bring other 
problems.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:02写道:
Hi,
"ram to cache the distinct data about sliding window" means I used computer 
momery not the third part db to cache the data need used in window.
“the data need used in window” means :such as the sliding window is 1 hour, and 
I need to count the distinct users, I need to cache the user id about one hour.
Cause there’re no related errors.
Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 10:49:43 AM
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Can you explain what "ram to cache the distinct data about sliding window" mean?
The information you provide is too small and will not help others to help you 
analyze the problem and provide advice.

In addition, regarding the usage of Flink related issues, please only send mail 
to the user mailing list.
The dev mailing list is mainly used to discuss development related issues.

Thanks vino.

? ?? mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午10:37写道:
Hi all,
I used flink window, and when the job begins, 

答复: 答复: No data issued by flink window after a few hours

2018-10-11 Thread 潘 功森
The second question looks fine.
[cid:image004.png@01D461B6.93D6FF70]

Yours,
September

发件人: Dawid Wysakowicz
发送时间: 2018年10月11日 15:13
收件人: 潘 功森; vino 
yang
抄送: user
主题: Re: 答复: No data issued by flink window after a few hours


Hi,

I agree with Vino, that you should check if the watermark is progressing for 
all subtasks, if you are using event time semantics. If this is not the problem 
it would help if you could share the code of your job. By the way have you 
tried reproducing the problem with collection source?

Best,

Dawid

On 10/10/18 08:44, 潘 功森 wrote:
Hi,

Cause default state size in one hour is too small,and the max window size is 24 
hours, so I used 500M.


MemoryStateBackend stateBackend = new MemoryStateBackend(MAX_STATE_SIZE);//500M

env.setStateBackend(stateBackend);

And I found Irrespective of the configured maximal state size, the state cannot 
be larger than the akka frame size.
So I add a config in flink-comf.yaml:
akka.framesize: 524288000b

What else do I have to pay attention to?

Yours,
September


发件人: vino yang 
发送时间: Wednesday, October 10, 2018 11:45:31 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

I saw the exception image you provided. Based on the exception message, it 
seems you used the default max state size (5MB).

You can specify the max state size to override the default value. Try :

MemoryStateBackend stateBackend = new MemoryStateBackend(theSizeOfBytes);

Please note that you need to reserve enough memory for Flink.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:36写道:
Please have a look about my last mail.

When the cached window data is too large, how?

Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 11:33:48 AM
收件人: pangong...@hotmail.com
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Did you mean "computer momery" referring to Memory Statebackend?
The Flink window mechanism is internally based on State, and this is done for 
fault tolerance.
If you introduce external storage, it will break its design and bring other 
problems.

Thanks, vino.

潘 功森 mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午11:02写道:
Hi,
"ram to cache the distinct data about sliding window" means I used computer 
momery not the third part db to cache the data need used in window.
“the data need used in window” means :such as the sliding window is 1 hour, and 
I need to count the distinct users, I need to cache the user id about one hour.
Cause there’re no related errors.
Yours,
September


发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Wednesday, October 10, 2018 10:49:43 AM
抄送: user
主题: Re: No data issued by flink window after a few hours

Hi,

Can you explain what "ram to cache the distinct data about sliding window" mean?
The information you provide is too small and will not help others to help you 
analyze the problem and provide advice.

In addition, regarding the usage of Flink related issues, please only send mail 
to the user mailing list.
The dev mailing list is mainly used to discuss development related issues.

Thanks vino.

? ?? mailto:pangong...@hotmail.com>> 于2018年10月10日周三 
上午10:37写道:
Hi all,
I used flink window, and when the job begins, we could get the results of 
windiow.But there’re no results issued after a few hours.
I found the job is still running and no errors, and the data not used 
window all can be issued.
By the way, I used Flink 1.3.2 and ram to cache the distinct data about 
sliding window.

Yours,
September




Re: User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
Hi Gary,

Yes you are right, we are using the attach mode.
I will try to put my jar to flink/lib to get around with the issue.
Thanks.

I will open a jira for the discrepancy for flink 1.3 and 1.5, thanks a lot.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Taher Koitawala
One other thought on the same lines was to use hive tables to store kafka
information to process streaming tables. Something like

"create table streaming_table (
bootstrapServers string,
topic string, keySerialiser string, ValueSerialiser string)"

Insert into streaming_table values(,"10.17.1.1:9092,10.17.2.2:9092,
10.17.3.3:9092", "KafkaTopicName", "SimpleStringSchema",
"SimpleSchemaString");

Create table processingtable(
//Enter fields here which match the kafka records schema);

Now we make a custom clause called something like "using"

The way we use this is:

Using streaming_table as configuration select count(*) from processingtable
as streaming;


This way users can now pass Flink SQL info easily and get rid of the Flink
SQL configuration file all together. This is simple and easy to understand
and I think most users would follow this.

Thanks,
Taher Koitawala

On Fri 12 Oct, 2018, 7:24 AM Taher Koitawala, 
wrote:

> I think integrating Flink with Hive would be an amazing option and also to
> get Flink's SQL up to pace would be amazing.
>
> Current Flink Sql syntax to prepare and process a table is too verbose,
> users manually need to retype table definitions and that's a pain. Hive
> metastore integration should be done through, many users are okay defining
> their table schemas in Hive as it is easy to main, change or even migrate.
>
> Also we could simply choosing batch and stream there with simply something
> like a "process as" clause.
>
> select count(*) from flink_mailing_list process as stream;
>
> select count(*) from flink_mailing_list process as batch;
>
> This way we could completely get rid of Flink SQL configuration files.
>
> Thanks,
> Taher Koitawala
>
> Integrating
> On Fri 12 Oct, 2018, 2:35 AM Zhang, Xuefu, 
> wrote:
>
>> Hi Rong,
>>
>> Thanks for your feedback. Some of my earlier comments might have
>> addressed some of your points, so here I'd like to cover some specifics.
>>
>> 1. Yes, I expect that table stats stored in Hive will be used in Flink
>> plan optimization, but it's not part of compatibility concern (yet).
>> 2. Both implementing Hive UDFs in Flink natively and making Hive UDFs
>> work in Flink are considered.
>> 3. I am aware of FLIP-24, but here the proposal is to make remote server
>> compatible with HiveServer2. They are not mutually exclusive either.
>> 4. The JDBC/ODBC driver in question is for the remote server that Flink
>> provides. It's usually the servicer owner who provides drivers to their
>> services. We weren't talking about JDBC/ODBC driver to external DB systems.
>>
>> Let me know if you have further questions.
>>
>> Thanks,
>> Xuefu
>>
>> --
>> Sender:Rong Rong 
>> Sent at:2018 Oct 12 (Fri) 01:52
>> Recipient:Timo Walther 
>> Cc:dev ; jornfranke ; Xuefu <
>> xuef...@alibaba-inc.com>; vino yang ; Fabian
>> Hueske ; user 
>> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>>
>> Hi Xuefu,
>>
>> Thanks for putting together the overview. I would like to add some more
>> on top of Timo's comments.
>> 1,2. I agree with Timo that a proper catalog support should also address
>> the metadata compatibility issues. I was actually wondering if you are
>> referring to something like utilizing table stats for plan optimization?
>> 4. If the key is to have users integrate Hive UDF without code changes to
>> Flink UDF, it shouldn't be a problem as Timo mentioned. Is your concern
>> mostly on the support of Hive UDFs that should be implemented in
>> Flink-table natively?
>> 7,8. Correct me if I am wrong, but I feel like some of the related
>> components might have already been discussed in the longer term road map of
>> FLIP-24 [1]?
>> 9. per Jorn's comment to stay clear from a tight dependency on Hive and
>> treat it as one "connector" system. Should we also consider treating
>> JDBC/ODBC driver as part of the component from the connector system instead
>> of having Flink to provide them?
>>
>> Thanks,
>> Rong
>>
>> [1].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>
>> On Thu, Oct 11, 2018 at 12:46 AM Timo Walther  wrote:
>> Hi Xuefu,
>>
>> thanks for your proposal, it is a nice summary. Here are my thoughts to
>> your list:
>>
>> 1. I think this is also on our current mid-term roadmap. Flink lacks a
>> poper catalog support for a very long time. Before we can connect
>> catalogs we need to define how to map all the information from a catalog
>> to Flink's representation. This is why the work on the unified connector
>> API [1] is going on for quite some time as it is the first approach to
>> discuss and represent the pure characteristics of connectors.
>> 2. It would be helpful to figure out what is missing in [1] to to ensure
>> this point. I guess we will need a new design document just for a proper
>> Hive catalog integration.
>> 3. This is already work in progress. ORC has been merged, Parquet is on
>> its way [1].
>> 4. This should be easy. The

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Taher Koitawala
I think integrating Flink with Hive would be an amazing option and also to
get Flink's SQL up to pace would be amazing.

Current Flink Sql syntax to prepare and process a table is too verbose,
users manually need to retype table definitions and that's a pain. Hive
metastore integration should be done through, many users are okay defining
their table schemas in Hive as it is easy to main, change or even migrate.

Also we could simply choosing batch and stream there with simply something
like a "process as" clause.

select count(*) from flink_mailing_list process as stream;

select count(*) from flink_mailing_list process as batch;

This way we could completely get rid of Flink SQL configuration files.

Thanks,
Taher Koitawala

Integrating
On Fri 12 Oct, 2018, 2:35 AM Zhang, Xuefu,  wrote:

> Hi Rong,
>
> Thanks for your feedback. Some of my earlier comments might have addressed
> some of your points, so here I'd like to cover some specifics.
>
> 1. Yes, I expect that table stats stored in Hive will be used in Flink
> plan optimization, but it's not part of compatibility concern (yet).
> 2. Both implementing Hive UDFs in Flink natively and making Hive UDFs work
> in Flink are considered.
> 3. I am aware of FLIP-24, but here the proposal is to make remote server
> compatible with HiveServer2. They are not mutually exclusive either.
> 4. The JDBC/ODBC driver in question is for the remote server that Flink
> provides. It's usually the servicer owner who provides drivers to their
> services. We weren't talking about JDBC/ODBC driver to external DB systems.
>
> Let me know if you have further questions.
>
> Thanks,
> Xuefu
>
> --
> Sender:Rong Rong 
> Sent at:2018 Oct 12 (Fri) 01:52
> Recipient:Timo Walther 
> Cc:dev ; jornfranke ; Xuefu <
> xuef...@alibaba-inc.com>; vino yang ; Fabian
> Hueske ; user 
> Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem
>
> Hi Xuefu,
>
> Thanks for putting together the overview. I would like to add some more on
> top of Timo's comments.
> 1,2. I agree with Timo that a proper catalog support should also address
> the metadata compatibility issues. I was actually wondering if you are
> referring to something like utilizing table stats for plan optimization?
> 4. If the key is to have users integrate Hive UDF without code changes to
> Flink UDF, it shouldn't be a problem as Timo mentioned. Is your concern
> mostly on the support of Hive UDFs that should be implemented in
> Flink-table natively?
> 7,8. Correct me if I am wrong, but I feel like some of the related
> components might have already been discussed in the longer term road map of
> FLIP-24 [1]?
> 9. per Jorn's comment to stay clear from a tight dependency on Hive and
> treat it as one "connector" system. Should we also consider treating
> JDBC/ODBC driver as part of the component from the connector system instead
> of having Flink to provide them?
>
> Thanks,
> Rong
>
> [1].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>
> On Thu, Oct 11, 2018 at 12:46 AM Timo Walther  wrote:
> Hi Xuefu,
>
> thanks for your proposal, it is a nice summary. Here are my thoughts to
> your list:
>
> 1. I think this is also on our current mid-term roadmap. Flink lacks a
> poper catalog support for a very long time. Before we can connect
> catalogs we need to define how to map all the information from a catalog
> to Flink's representation. This is why the work on the unified connector
> API [1] is going on for quite some time as it is the first approach to
> discuss and represent the pure characteristics of connectors.
> 2. It would be helpful to figure out what is missing in [1] to to ensure
> this point. I guess we will need a new design document just for a proper
> Hive catalog integration.
> 3. This is already work in progress. ORC has been merged, Parquet is on
> its way [1].
> 4. This should be easy. There was a PR in past that I reviewed but was
> not maintained anymore.
> 5. The type system of Flink SQL is very flexible. Only UNION type is
> missing.
> 6. A Flink SQL DDL is on the roadmap soon once we are done with [1].
> Support for Hive syntax also needs cooperation with Apache Calcite.
> 7-11. Long-term goals.
>
> I would also propose to start with a smaller scope where also current
> Flink SQL users can profit: 1, 2, 5, 3. This would allow to grow the
> Flink SQL ecosystem. After that we can aim to be fully compatible
> including syntax and UDFs (4, 6 etc.). Once the core is ready, we can
> work on the tooling (7, 8, 9) and performance (10, 11).
>
> @Jörn: Yes, we should not have a tight dependency on Hive. It should be
> treated as one "connector" system out of many.
>
> Thanks,
> Timo
>
> [1]
>
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#
> [2] https://github.com/apache/flink/pull/6483
>
> Am 11.10.18 um 07:54 schrieb Jörn Franke:
> > Would it maybe make sense to provide 

When does Trigger.clear() get called?

2018-10-11 Thread Andrew Danks
Hello,

I see that the clear() function is implemented for various types of Triggers in 
the Flink API. For example:
https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
 


I am working on a custom Trigger for my application and have implemented 
clear() in a similar way.

However, having put a breakpoint in this function it doesn’t seem to get called 
when I expect. The source code says that is called "when a window is purged”[1] 
but when my Trigger emits a PURGE this function never seems to get called. I am 
on Flink 1.3.

Hoping someone can shed more light on the purpose of clear() and how/when it 
gets called

Thanks!
Andrew


[1] 
https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
 




Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Zhang, Xuefu
Hi Rong,

Thanks for your feedback. Some of my earlier comments might have addressed some 
of your points, so here I'd like to cover some specifics.

1. Yes, I expect that table stats stored in Hive will be used in Flink plan 
optimization, but it's not part of compatibility concern (yet).
2. Both implementing Hive UDFs in Flink natively and making Hive UDFs work in 
Flink are considered.
3. I am aware of FLIP-24, but here the proposal is to make remote server 
compatible with HiveServer2. They are not mutually exclusive either.
4. The JDBC/ODBC driver in question is for the remote server that Flink 
provides. It's usually the servicer owner who provides drivers to their 
services. We weren't talking about JDBC/ODBC driver to external DB systems.

Let me know if you have further questions.

Thanks,
Xuefu


--
Sender:Rong Rong 
Sent at:2018 Oct 12 (Fri) 01:52
Recipient:Timo Walther 
Cc:dev ; jornfranke ; Xuefu 
; vino yang ; Fabian Hueske 
; user 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Xuefu, 

Thanks for putting together the overview. I would like to add some more on top 
of Timo's comments.
1,2. I agree with Timo that a proper catalog support should also address the 
metadata compatibility issues. I was actually wondering if you are referring to 
something like utilizing table stats for plan optimization?
4. If the key is to have users integrate Hive UDF without code changes to Flink 
UDF, it shouldn't be a problem as Timo mentioned. Is your concern mostly on the 
support of Hive UDFs that should be implemented in Flink-table natively?
7,8. Correct me if I am wrong, but I feel like some of the related components 
might have already been discussed in the longer term road map of FLIP-24 [1]?
9. per Jorn's comment to stay clear from a tight dependency on Hive and treat 
it as one "connector" system. Should we also consider treating JDBC/ODBC driver 
as part of the component from the connector system instead of having Flink to 
provide them?

Thanks,
Rong

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
On Thu, Oct 11, 2018 at 12:46 AM Timo Walther  wrote:
Hi Xuefu,

 thanks for your proposal, it is a nice summary. Here are my thoughts to 
 your list:

 1. I think this is also on our current mid-term roadmap. Flink lacks a 
 poper catalog support for a very long time. Before we can connect 
 catalogs we need to define how to map all the information from a catalog 
 to Flink's representation. This is why the work on the unified connector 
 API [1] is going on for quite some time as it is the first approach to 
 discuss and represent the pure characteristics of connectors.
 2. It would be helpful to figure out what is missing in [1] to to ensure 
 this point. I guess we will need a new design document just for a proper 
 Hive catalog integration.
 3. This is already work in progress. ORC has been merged, Parquet is on 
 its way [1].
 4. This should be easy. There was a PR in past that I reviewed but was 
 not maintained anymore.
 5. The type system of Flink SQL is very flexible. Only UNION type is 
 missing.
 6. A Flink SQL DDL is on the roadmap soon once we are done with [1]. 
 Support for Hive syntax also needs cooperation with Apache Calcite.
 7-11. Long-term goals.

 I would also propose to start with a smaller scope where also current 
 Flink SQL users can profit: 1, 2, 5, 3. This would allow to grow the 
 Flink SQL ecosystem. After that we can aim to be fully compatible 
 including syntax and UDFs (4, 6 etc.). Once the core is ready, we can 
 work on the tooling (7, 8, 9) and performance (10, 11).

 @Jörn: Yes, we should not have a tight dependency on Hive. It should be 
 treated as one "connector" system out of many.

 Thanks,
 Timo

 [1] 
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#
 [2] https://github.com/apache/flink/pull/6483

 Am 11.10.18 um 07:54 schrieb Jörn Franke:
 > Would it maybe make sense to provide Flink as an engine on Hive 
 > („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
 > coupled than integrating hive in all possible flink core modules and thus 
 > introducing a very tight dependency to Hive in the core.
 > 1,2,3 could be achieved via a connector based on the Flink Table API.
 > Just as a proposal to start this Endeavour as independent projects (hive 
 > engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
 > distant future if the Hive integration is heavily demanded one could then 
 > integrate it more tightly if needed.
 >
 > What is meant by 11?
 >> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :
 >>
 >> Hi Fabian/Vno,
 >>
 >> Thank you very much for your encouragement inquiry. Sorry that I didn't see 
 >> Fabian's email until I read Vino's response just now. (Somehow Fabian's 
 >> went to the spam folder.)
 >>
 >> My proposal contains long-term and short-term

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Zhang, Xuefu
Hi Timo,

Thank you for your input. It's exciting to see that the community has already 
initiated some of the topics. We'd certainly like to leverage the current and 
previous work and make progress in phases. Here I'd like to comment on a few 
things on top of your feedback.

1. I think there are two aspects #1 and #2 with regard to Hive metastore: a) as 
an backend storage for Flink's metadata (currently in memory), and b) an 
external catalog (just like a JDBC catalog) that Flink can interact with. While 
it may be possible and would be nice if we can achieve both in a single design, 
our focus has been on the latter. We will consider both cases in our design.

2. Re #5, I agree that Flink seems having the majority of data types. However, 
supporting some of them (such as struct) at SQL layer needs work on the parser 
(Calcite).

3. Similarly for #6, work needs to be done on parsing side. We can certainly 
ask Calcite community to provide Hive dialect parsing. This can be challenging 
and time-consuming. At the same time, we can also explore the possibilities of 
solving the problem in Flink, such as using Calcite's official extension 
mechanism. We will open the discussion when we get there.

Yes, I agree with you that we should start with a small scope while keeping a 
forward thinking. Specifically, we will first look at the metadata and data 
compatibilities, data types, DDL/DML, Query, UDFs, and so on. I think we align 
well on this.

Please let me know if you have further thoughts or commends.

Thanks,
Xuefu


--
Sender:Timo Walther 
Sent at:2018 Oct 11 (Thu) 15:46
Recipient:dev ; "Jörn Franke" ; 
Xuefu 
Cc:vino yang ; Fabian Hueske ; user 

Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

Hi Xuefu,

thanks for your proposal, it is a nice summary. Here are my thoughts to 
your list:

1. I think this is also on our current mid-term roadmap. Flink lacks a 
poper catalog support for a very long time. Before we can connect 
catalogs we need to define how to map all the information from a catalog 
to Flink's representation. This is why the work on the unified connector 
API [1] is going on for quite some time as it is the first approach to 
discuss and represent the pure characteristics of connectors.
2. It would be helpful to figure out what is missing in [1] to to ensure 
this point. I guess we will need a new design document just for a proper 
Hive catalog integration.
3. This is already work in progress. ORC has been merged, Parquet is on 
its way [1].
4. This should be easy. There was a PR in past that I reviewed but was 
not maintained anymore.
5. The type system of Flink SQL is very flexible. Only UNION type is 
missing.
6. A Flink SQL DDL is on the roadmap soon once we are done with [1]. 
Support for Hive syntax also needs cooperation with Apache Calcite.
7-11. Long-term goals.

I would also propose to start with a smaller scope where also current 
Flink SQL users can profit: 1, 2, 5, 3. This would allow to grow the 
Flink SQL ecosystem. After that we can aim to be fully compatible 
including syntax and UDFs (4, 6 etc.). Once the core is ready, we can 
work on the tooling (7, 8, 9) and performance (10, 11).

@Jörn: Yes, we should not have a tight dependency on Hive. It should be 
treated as one "connector" system out of many.

Thanks,
Timo

[1] 
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#
[2] https://github.com/apache/flink/pull/6483

Am 11.10.18 um 07:54 schrieb Jörn Franke:
> Would it maybe make sense to provide Flink as an engine on Hive 
> („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
> coupled than integrating hive in all possible flink core modules and thus 
> introducing a very tight dependency to Hive in the core.
> 1,2,3 could be achieved via a connector based on the Flink Table API.
> Just as a proposal to start this Endeavour as independent projects (hive 
> engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
> distant future if the Hive integration is heavily demanded one could then 
> integrate it more tightly if needed.
>
> What is meant by 11?
>> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :
>>
>> Hi Fabian/Vno,
>>
>> Thank you very much for your encouragement inquiry. Sorry that I didn't see 
>> Fabian's email until I read Vino's response just now. (Somehow Fabian's went 
>> to the spam folder.)
>>
>> My proposal contains long-term and short-terms goals. Nevertheless, the 
>> effort will focus on the following areas, including Fabian's list:
>>
>> 1. Hive metastore connectivity - This covers both read/write access, which 
>> means Flink can make full use of Hive's metastore as its catalog (at least 
>> for the batch but can extend for streaming as well).
>> 2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
>> created by Hive can be understood by Flink and the reverse 

Re: Taskmanager times out continuously for registration with Jobmanager

2018-10-11 Thread Abdul Qadeer
Hi Till,

I didn't try with newer versions as it is not possible to update the Flink
version atm.
If you could give any pointers for debugging that would be great.

On Thu, Oct 11, 2018 at 2:44 AM Till Rohrmann  wrote:

> Hi Abdul,
>
> have you tried whether this problem also occurs with newer Flink versions
> (1.5.4 or 1.6.1)?
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 9:24 AM Dawid Wysakowicz 
> wrote:
>
>> Hi Abdul,
>>
>> I've added Till and Gary to cc, who might be able to help you.
>>
>> Best,
>>
>> Dawid
>>
>> On 11/10/18 03:05, Abdul Qadeer wrote:
>>
>> Hi,
>>
>>
>> We are facing an issue in standalone HA mode in Flink 1.4.0 where
>> Taskmanager restarts and is not able to register with the Jobmanager. It
>> times out awaiting *AcknowledgeRegistration/AlreadyRegistered* message
>> from Jobmanager Actor and keeps sending *RegisterTaskManager *message.
>> The logs at Jobmanager don’t show anything about registration
>> failure/request. It doesn’t print *log*.debug(*s"RegisterTaskManager: $*
>> msg*"*) (from JobManager.scala) either. The network connection between
>> taskmanager and jobmanager seems fine; tcpdump shows message sent to
>> jobmanager and TCP ACK received from jobmanager. Note that the
>> communication is happening between docker containers.
>>
>>
>> Following are the logs from Taskmanager:
>>
>>
>>
>> {"timeMillis":1539189572438,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
>> to register at JobManager akka.tcp://
>> flink@192.168.83.51:6123/user/jobmanager (attempt 1400, timeout: 3
>> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>>
>> {"timeMillis":1539189580229,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
>> ping response for sessionid: 0x1260ea5002d after
>> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>>
>> {"timeMillis":1539189600247,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
>> ping response for sessionid: 0x1260ea5002d after
>> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>>
>> {"timeMillis":1539189602458,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
>> to register at JobManager akka.tcp://
>> flink@192.168.83.51:6123/user/jobmanager (attempt 1401, timeout: 3
>> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>>
>> {"timeMillis":1539189620251,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
>> ping response for sessionid: 0x1260ea5002d after
>> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>>
>> {"timeMillis":1539189632478,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
>> to register at JobManager akka.tcp://
>> flink@192.168.83.51:6123/user/jobmanager (attempt 1402, timeout: 3
>> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>>
>>
>>


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Rong Rong
Hi Xuefu,

Thanks for putting together the overview. I would like to add some more on
top of Timo's comments.
1,2. I agree with Timo that a proper catalog support should also address
the metadata compatibility issues. I was actually wondering if you are
referring to something like utilizing table stats for plan optimization?
4. If the key is to have users integrate Hive UDF without code changes to
Flink UDF, it shouldn't be a problem as Timo mentioned. Is your concern
mostly on the support of Hive UDFs that should be implemented in
Flink-table natively?
7,8. Correct me if I am wrong, but I feel like some of the related
components might have already been discussed in the longer term road map of
FLIP-24 [1]?
9. per Jorn's comment to stay clear from a tight dependency on Hive and
treat it as one "connector" system. Should we also consider treating
JDBC/ODBC driver as part of the component from the connector system instead
of having Flink to provide them?

Thanks,
Rong

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client

On Thu, Oct 11, 2018 at 12:46 AM Timo Walther  wrote:

> Hi Xuefu,
>
> thanks for your proposal, it is a nice summary. Here are my thoughts to
> your list:
>
> 1. I think this is also on our current mid-term roadmap. Flink lacks a
> poper catalog support for a very long time. Before we can connect
> catalogs we need to define how to map all the information from a catalog
> to Flink's representation. This is why the work on the unified connector
> API [1] is going on for quite some time as it is the first approach to
> discuss and represent the pure characteristics of connectors.
> 2. It would be helpful to figure out what is missing in [1] to to ensure
> this point. I guess we will need a new design document just for a proper
> Hive catalog integration.
> 3. This is already work in progress. ORC has been merged, Parquet is on
> its way [1].
> 4. This should be easy. There was a PR in past that I reviewed but was
> not maintained anymore.
> 5. The type system of Flink SQL is very flexible. Only UNION type is
> missing.
> 6. A Flink SQL DDL is on the roadmap soon once we are done with [1].
> Support for Hive syntax also needs cooperation with Apache Calcite.
> 7-11. Long-term goals.
>
> I would also propose to start with a smaller scope where also current
> Flink SQL users can profit: 1, 2, 5, 3. This would allow to grow the
> Flink SQL ecosystem. After that we can aim to be fully compatible
> including syntax and UDFs (4, 6 etc.). Once the core is ready, we can
> work on the tooling (7, 8, 9) and performance (10, 11).
>
> @Jörn: Yes, we should not have a tight dependency on Hive. It should be
> treated as one "connector" system out of many.
>
> Thanks,
> Timo
>
> [1]
>
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#
> [2] https://github.com/apache/flink/pull/6483
>
> Am 11.10.18 um 07:54 schrieb Jörn Franke:
> > Would it maybe make sense to provide Flink as an engine on Hive
> („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely
> coupled than integrating hive in all possible flink core modules and thus
> introducing a very tight dependency to Hive in the core.
> > 1,2,3 could be achieved via a connector based on the Flink Table API.
> > Just as a proposal to start this Endeavour as independent projects (hive
> engine, connector) to avoid too tight coupling with Flink. Maybe in a more
> distant future if the Hive integration is heavily demanded one could then
> integrate it more tightly if needed.
> >
> > What is meant by 11?
> >> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :
> >>
> >> Hi Fabian/Vno,
> >>
> >> Thank you very much for your encouragement inquiry. Sorry that I didn't
> see Fabian's email until I read Vino's response just now. (Somehow Fabian's
> went to the spam folder.)
> >>
> >> My proposal contains long-term and short-terms goals. Nevertheless, the
> effort will focus on the following areas, including Fabian's list:
> >>
> >> 1. Hive metastore connectivity - This covers both read/write access,
> which means Flink can make full use of Hive's metastore as its catalog (at
> least for the batch but can extend for streaming as well).
> >> 2. Metadata compatibility - Objects (databases, tables, partitions,
> etc) created by Hive can be understood by Flink and the reverse direction
> is true also.
> >> 3. Data compatibility - Similar to #2, data produced by Hive can be
> consumed by Flink and vise versa.
> >> 4. Support Hive UDFs - For all Hive's native udfs, Flink either
> provides its own implementation or make Hive's implementation work in
> Flink. Further, for user created UDFs in Hive, Flink SQL should provide a
> mechanism allowing user to import them into Flink without any code change
> required.
> >> 5. Data types -  Flink SQL should support all data types that are
> available in Hive.
> >> 6. SQL Language - Flink SQL should support SQL standard (such as
> SQL2003) with extension 

RE: flink memory management / temp-io dir question

2018-10-11 Thread anand.gopinath
Its ok – I see the relevant docs now…

"The RocksDBStateBackend holds in-flight data in a RocksDB database that is 
(per default) stored in the TaskManager data directories."

Thanks for your help
Anand

From: Gopinath, Anand
Sent: 11 October 2018 18:40
To: 'Till Rohrmann'; Kostas Kloudas
Cc: user; Till Rohrmann
Subject: RE: flink memory management / temp-io dir question

Hi Till,
Thanks for the reply.

I don’t use batch, so I assume  what I am seeing is streaming related. I 
thought rocksdb writes to a different dir though ( as defined by  
checkpoint.data.uri)?
Regards,
Anand

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: 08 October 2018 13:39
To: Kostas Kloudas
Cc: Gopinath, Anand; user; Till Rohrmann
Subject: Re: flink memory management / temp-io dir question

Hi Anand,

spilling using the io directories is only relevant for Flink's batch 
processing. This happens, for example if you enable blocking data exchange 
where the produced data cannot be kept in memory. Moreover, it is used by many 
of Flink's out-of-core data structures to enable exactly this feature (e.g. 
users are the MutableHashTable, the MergeIterator to combine sorted ata which 
has been spilled or the SorterMerger to actually spill data).

In streaming Flink uses the RocksDB state backend to spill very large state 
gracefully to disk. Thus, you would need to configure RocksDB in order to 
control the spilling behaviour.

Cheers,
Till

On Mon, Oct 8, 2018 at 2:18 PM Kostas Kloudas 
mailto:k.klou...@data-artisans.com>> wrote:
Sorry, I forgot to cc’ Till.

On Oct 8, 2018, at 2:17 PM, Kostas Kloudas 
mailto:k.klou...@data-artisans.com>> wrote:

Hi Anand,

I think that Till is the best person to answer your question.

Cheers,
Kostas

On Oct 5, 2018, at 3:44 PM, 
anand.gopin...@ubs.com wrote:

Hi ,
I had a question with respect flink memory management / overspill to /tmp.

In the docs 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#configuring-temporary-io-directories)
 it says: Although Flink aims to process as much data in main memory as 
possible, it is not uncommon that more data needs to be processed than memory 
is available. Flink’s runtime is designed to write temporary data to disk to 
handle these situations

In a  flink job  that processes a couple streams of 1M events in a  windowed co 
group function with parallelism 8 - we see 8 dirs created in /tmp with 100s of 
Meg of data, the name of each dir seems aligned to the data for each parallel 
thread windowing against the co-group  operator

e.g.
bash-4.2$ du -sh *
0   flink-dist-cache-a4a69215-665a-4c3c-8d90-416cbe192f26
352Mflink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e
4.0KlocalState
7.2Mrocksdb-lib-03d9460b15e6bf6af4f3d9b0ff7980c3

bash-4.2$ du -sh flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/*
...
36M 
flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__1_8__uuid_93307150-4f62-4b06-a71e-0230360f7d86
36M 
flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__2_8__uuid_7b2f8957-7044-4bb3-869e-28843bd737a1
36M 
flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__3_8__uuid_54306a44-7e06-45ae-ba0e-4649887bca7e
...

I was wondering can / should this 'over spill' be avoided by increasing the 
heap of the task manager or another config or should I not worry about it?
Is there more information/docs on how this data is used/ cleaned up & what is 
the cost of this overspill to latency/ checkpointing? Any impact I should be 
aware of?

thanks
Anand

Visit our website at http://www.ubs.com

This message contains confidential information and is intended only
for the individual named. If you are not the named addressee you
should not disseminate, distribute or copy this e-mail. Please
notify the sender immediately by e-mail if you have received this
e-mail by mistake and delete this e-mail from your system.

E-mails are not encrypted and cannot be guaranteed to be secure or
error-free as information could be intercepted, corrupted, lost,
destroyed, arrive late or incomplete, or contain viruses. The sender
therefore does not accept liability for any errors or omissions in the
contents of this message which arise as a result of e-mail transmission.
If verification is required please request a hard-copy version. This
message is provided for informational purposes and should not be
construed as a solicitation or offer to buy or sell any securities
or related financial instruments.

UBS Limited is a company limited by shares incorporated in the United
Kingdom registered in England and Wales with number 2035362.
Registered Office: 5 Broadgate, London EC2M 2QS
UBS Limited is authorised by the Prudential Regulation Authority
and regu

RE: flink memory management / temp-io dir question

2018-10-11 Thread anand.gopinath
Hi Till,
Thanks for the reply.

I don’t use batch, so I assume  what I am seeing is streaming related. I 
thought rocksdb writes to a different dir though ( as defined by  
checkpoint.data.uri)?
Regards,
Anand

From: Till Rohrmann [mailto:trohrm...@apache.org]
Sent: 08 October 2018 13:39
To: Kostas Kloudas
Cc: Gopinath, Anand; user; Till Rohrmann
Subject: Re: flink memory management / temp-io dir question

Hi Anand,

spilling using the io directories is only relevant for Flink's batch 
processing. This happens, for example if you enable blocking data exchange 
where the produced data cannot be kept in memory. Moreover, it is used by many 
of Flink's out-of-core data structures to enable exactly this feature (e.g. 
users are the MutableHashTable, the MergeIterator to combine sorted ata which 
has been spilled or the SorterMerger to actually spill data).

In streaming Flink uses the RocksDB state backend to spill very large state 
gracefully to disk. Thus, you would need to configure RocksDB in order to 
control the spilling behaviour.

Cheers,
Till

On Mon, Oct 8, 2018 at 2:18 PM Kostas Kloudas 
mailto:k.klou...@data-artisans.com>> wrote:
Sorry, I forgot to cc’ Till.


On Oct 8, 2018, at 2:17 PM, Kostas Kloudas 
mailto:k.klou...@data-artisans.com>> wrote:

Hi Anand,

I think that Till is the best person to answer your question.

Cheers,
Kostas


On Oct 5, 2018, at 3:44 PM, 
anand.gopin...@ubs.com wrote:

Hi ,
I had a question with respect flink memory management / overspill to /tmp.

In the docs 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/config.html#configuring-temporary-io-directories)
 it says: Although Flink aims to process as much data in main memory as 
possible, it is not uncommon that more data needs to be processed than memory 
is available. Flink’s runtime is designed to write temporary data to disk to 
handle these situations

In a  flink job  that processes a couple streams of 1M events in a  windowed co 
group function with parallelism 8 - we see 8 dirs created in /tmp with 100s of 
Meg of data, the name of each dir seems aligned to the data for each parallel 
thread windowing against the co-group  operator

e.g.
bash-4.2$ du -sh *
0   flink-dist-cache-a4a69215-665a-4c3c-8d90-416cbe192f26
352Mflink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e
4.0KlocalState
7.2Mrocksdb-lib-03d9460b15e6bf6af4f3d9b0ff7980c3

bash-4.2$ du -sh flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/*
...
36M 
flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__1_8__uuid_93307150-4f62-4b06-a71e-0230360f7d86
36M 
flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__2_8__uuid_7b2f8957-7044-4bb3-869e-28843bd737a1
36M 
flink-io-9033517c-ac92-4baa-9e59-79bc80c72a9e/job_cf2dca7843dd6b6296aa1a9d15a1d435_op_WindowOperator_014556c228cb5344d41861769d2bbbc1__3_8__uuid_54306a44-7e06-45ae-ba0e-4649887bca7e
...

I was wondering can / should this 'over spill' be avoided by increasing the 
heap of the task manager or another config or should I not worry about it?
Is there more information/docs on how this data is used/ cleaned up & what is 
the cost of this overspill to latency/ checkpointing? Any impact I should be 
aware of?

thanks
Anand

Visit our website at http://www.ubs.com

This message contains confidential information and is intended only
for the individual named. If you are not the named addressee you
should not disseminate, distribute or copy this e-mail. Please
notify the sender immediately by e-mail if you have received this
e-mail by mistake and delete this e-mail from your system.

E-mails are not encrypted and cannot be guaranteed to be secure or
error-free as information could be intercepted, corrupted, lost,
destroyed, arrive late or incomplete, or contain viruses. The sender
therefore does not accept liability for any errors or omissions in the
contents of this message which arise as a result of e-mail transmission.
If verification is required please request a hard-copy version. This
message is provided for informational purposes and should not be
construed as a solicitation or offer to buy or sell any securities
or related financial instruments.

UBS Limited is a company limited by shares incorporated in the United
Kingdom registered in England and Wales with number 2035362.
Registered Office: 5 Broadgate, London EC2M 2QS
UBS Limited is authorised by the Prudential Regulation Authority
and regulated by the Financial Conduct Authority and the Prudential
Regulation Authority.

UBS AG is a public company incorporated with limited liability in
Switzerland domiciled in the Canton of Basel-City and the Canton of
Zurich respectively registered at the Commercial Registry offices in
those Cantons with new Identification No: CHE-101.329.561 as from 18
December 2013 (and

Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Zhang, Xuefu
Hi Jörn,

Thanks for your feedback. Yes, I think Hive on Flink makes sense and in fact it 
is one of the two approaches that I named in the beginning of the thread. As 
also pointed out there, this isn't mutually exclusive from work we proposed 
inside Flink and they target at different user groups and user cases. Further, 
what we proposed to do in Flink should be a good showcase that demonstrate 
Flink's capabilities in batch processing and convince Hive community of the 
worth of a new engine. As you might know, the idea encountered some doubt and 
resistance. Nevertheless, we do have a solid plan for Hive on Flink, which we 
will execute once Flink SQL is in a good shape.

I also agree with you that Flink SQL shouldn't be closely coupled with Hive. 
While we mentioned Hive in many of the proposed items, most of them are coupled 
only in concepts and functionality rather than code or libraries. We are taking 
the advantage of the connector framework in Flink. The only thing that might be 
exceptional is to support Hive built-in UDFs, which we may not make it work out 
of the box to avoid the coupling. We could, for example, require users bring in 
Hive library and register themselves. This is subject to further discussion.

#11 is about Flink runtime enhancement that is meant to make task failures more 
tolerable (so that the job don't have to start from the beginning in case of 
task failures) and to make task scheduling more resource-efficient. Flink's 
current design in those two aspects leans more to stream processing, which may 
not be good enough for batch processing. We will provide more detailed design 
when we get to them.

Please let me know if you have further thoughts or feedback.

Thanks,
Xuefu



--
Sender:Jörn Franke 
Sent at:2018 Oct 11 (Thu) 13:54
Recipient:Xuefu 
Cc:vino yang ; Fabian Hueske ; dev 
; user 
Subject:Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem


Would it maybe make sense to provide Flink as an engine on Hive 
(„flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
coupled than integrating hive in all possible flink core modules and thus 
introducing a very tight dependency to Hive in the core.
1,2,3 could be achieved via a connector based on the Flink Table API.
Just as a proposal to start this Endeavour as independent projects (hive 
engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
distant future if the Hive integration is heavily demanded one could then 
integrate it more tightly if needed. 

What is meant by 11?
Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :


Hi Fabian/Vno,

Thank you very much for your encouragement inquiry. Sorry that I didn't see 
Fabian's email until I read Vino's response just now. (Somehow Fabian's went to 
the spam folder.)

My proposal contains long-term and short-terms goals. Nevertheless, the effort 
will focus on the following areas, including Fabian's list:

1. Hive metastore connectivity - This covers both read/write access, which 
means Flink can make full use of Hive's metastore as its catalog (at least for 
the batch but can extend for streaming as well).
2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
created by Hive can be understood by Flink and the reverse direction is true 
also.
3. Data compatibility - Similar to #2, data produced by Hive can be consumed by 
Flink and vise versa.
4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
own implementation or make Hive's implementation work in Flink. Further, for 
user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
to import them into Flink without any code change required.
5. Data types -  Flink SQL should support all data types that are available in 
Hive.
6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) with 
extension to support Hive's syntax and language features, around DDL, DML, and 
SELECT queries.
7.  SQL CLI - this is currently developing in Flink but more effort is needed.
8. Server - provide a server that's compatible with Hive's HiverServer2 in 
thrift APIs, such that HiveServer2 users can reuse their existing client (such 
as beeline) but connect to Flink's thrift server instead.
9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
application to use to connect to its thrift server
10. Support other user's customizations in Hive, such as Hive Serdes, storage 
handlers, etc.
11. Better task failure tolerance and task scheduling at Flink runtime.

As you can see, achieving all those requires significant effort and across all 
layers in Flink. However, a short-term goal could  include only core areas 
(such as 1, 2, 4, 5, 6, 7) or start  at a smaller scope (such as #3, #6).

Please share your further thoughts. If we generally agree that this is the 
right direction, I could come up with a formal proposal quickly and then we can

Are savepoints / checkpoints co-ordinated?

2018-10-11 Thread anand.gopinath
Hi,

I had a couple questions about savepoints / checkpoints

When I issue "Cancel Job with Savepoint", how is that instruction co-ordinated 
with check points? Am I certain the savepoint will be the last operation (i.e. 
no more check points)?

I have a kafka src>operation>kafka sink task in flink. And it looks like on 
restart from the savepoint there are duplicates written to the sink topic in 
kafka. The dupes overlap with the last few events prior to save point, and I am 
trying to work out what could have happened.
My FlinkKafkaProducer011  is set to Semantic.AT_LEAST_ONCE, but 
env.enableCheckpointing(parameters.getInt("checkpoint.interval"), 
CheckpointingMode.EXACTLY_ONCE).
I thought at least once still implies flushes to kafka still only occur with a 
checkpoint.

One  theory is a further checkpoint occurred after/ during the savepoint - 
which would have flushed events to kafka that are not in my savepoint.

Any pointers to schoolboy errors I may have made would be appreciated.

-
Also  am I right in thinking if I have managed state with rocksdb back end that 
is using 1G on disk, but substantially less keyed state in memory, a savepoint 
needs to save the full 1G to complete?

Regards
Anand

Visit our website at http://www.ubs.com 

This message contains confidential information and is intended only 
for the individual named. If you are not the named addressee you 
should not disseminate, distribute or copy this e-mail. Please 
notify the sender immediately by e-mail if you have received this 
e-mail by mistake and delete this e-mail from your system. 

E-mails are not encrypted and cannot be guaranteed to be secure or 
error-free as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses. The sender 
therefore does not accept liability for any errors or omissions in the 
contents of this message which arise as a result of e-mail transmission. 
If verification is required please request a hard-copy version. This 
message is provided for informational purposes and should not be 
construed as a solicitation or offer to buy or sell any securities 
or related financial instruments. 

UBS Limited is a company limited by shares incorporated in the United 
Kingdom registered in England and Wales with number 2035362.  
Registered Office: 5 Broadgate, London EC2M 2QS
UBS Limited is authorised by the Prudential Regulation Authority 
and regulated by the Financial Conduct Authority and the Prudential 
Regulation Authority.

UBS AG is a public company incorporated with limited liability in
Switzerland domiciled in the Canton of Basel-City and the Canton of
Zurich respectively registered at the Commercial Registry offices in
those Cantons with new Identification No: CHE-101.329.561 as from 18
December 2013 (and prior to 18 December 2013 with Identification
No: CH-270.3.004.646-4) and having respective head offices at
Aeschenvorstadt 1, 4051 Basel and Bahnhofstrasse 45, 8001 Zurich,
Switzerland and is authorised and regulated by the Financial Market
Supervisory Authority in Switzerland.  Registered in the United
Kingdom as a foreign company with No: FC021146 and having a UK
Establishment registered at Companies House, Cardiff, with
No: BR 004507.  The principal office of UK Establishment: 
5 Broadgate, London EC2M 2QS. In the United Kingdom, UBS AG is 
authorised by the Prudential Regulation Authority and subject to 
regulation by the Financial Conduct Authority and limited regulation 
by the Prudential Regulation Authority.  Details about the extent 
of our regulation by the Prudential Regulation Authority are 
available from us on request.

UBS Business Solutions AG is a public company incorporated with 
limited liability in Switzerland domiciled in the Canton of Zurich 
registered at the Commercial Registry office with Identification 
No: CHE-262.289.477 and having its head office at Bahnhofstrasse 45, 
8001 Zurich, Switzerland.  Registered in the United Kingdom as a 
foreign company with No: FC034139 and having a UK Establishment 
registered at Companies House, Cardiff, with No: BR019277.  The 
principal office of UK Establishment: 5 Broadgate London EC2M 2QS.  

UBS reserves the right to retain all messages. Messages are protected 
and accessed only in legally justified cases. 

Re: User jar is present in the flink job manager's class path

2018-10-11 Thread Gary Yao
Hi,

Could it be that you are submitting the job in attached mode, i.e., without
-d
parameter? In the "job cluster attached mode", we actually start a Flink
session cluster (and stop it again from the CLI) [1]. Therefore, in attached
mode, the config option "yarn.per-job-cluster.include-user-jar" is
effectively
ignored. If you submit with -d, a "true job cluster" is started, and the
user
jar should be added to the system classpath. Alternatively, if the detached
mode is not an option for you, you could add a jar with your custom logger
implementation to the flink /lib directory.

If the behavior in Flink 1.3 is indeed different, then I would consider
this a
regression. Can you open a jira issue for that?

Best,
Gary

[1]
https://github.com/apache/flink/blob/d13015e23c805e1aaefdc7d8037f2fa87ea74830/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L261

On Thu, Oct 11, 2018 at 3:54 PM Timo Walther  wrote:

> Yes, you are right. I was not aware that the resolution order depends on
> the cluster deployment. I will loop in Gary (in CC) that might know
> about such a YARN setup.
>
> Regards,
> Timo
>
> Am 11.10.18 um 15:47 schrieb yinhua.dai:
> > Hi Timo,
> >
> > I didn't tried to configure the classloader order, according to the
> > document, it should only be needed for yarn-session mode, right?
> >
> > I can see the ship files(-yt /path/dir/) is present in job manager's
> class
> > path, so maybe I should put my uber jar in the -yt path so that it will
> be
> > shipped and add to class path in flink 1.5?
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>


Re: [BucketingSink] notify on moving into pending/ final state

2018-10-11 Thread Dawid Wysakowicz
Hi Ribat,
I haven't checked your PR but we introduced a new connector in flink 1.6
called StreamingFileSink that is supposed to replace BucketingSink long
term. I think it might solve a few problems of yours. Have you checked it
by chance?

Best,
Dawid

On Thu, 11 Oct 2018, 14:10 Rinat,  wrote:

> Hi Piotr, during the migration to the latest Flink version, we’ve decided
> to try to contribute this functionality to the master branch.
>
> PR is available here https://github.com/apache/flink/pull/6824
> More details about hooking the state changes in BucketingSink are
> available in https://issues.apache.org/jira/browse/FLINK-9592
>
> Thx !
>
> On 14 Jun 2018, at 23:29, Rinat  wrote:
>
> Hi Piotr, I’ve create an issue
> https://issues.apache.org/jira/browse/FLINK-9592
>
> The third proposal looks great, may I try to contribute this issue ?
>
> On 14 Jun 2018, at 12:29, Piotr Nowojski  wrote:
>
> Hi,
>
> Couple of things:
>
> 1. Please create a Jira ticket with this proposal, so we can move
> discussion from user mailing list.
>
> I haven’t thought it through, so take my comments with a grain of salt,
> however:
>
> 2. If we were to go with such callback, I would prefer to have one
> BucketStateChangeCallback, with methods like `onInProgressToPending(…)`,
> `onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one
> interface passed three times/four times for different purposes.
>
> 3. Other thing that I had in mind is that BucketingSink could be rewritten
> to extend TwoPhaseCommitSinkFunction. In that case, with
>
> public class BucketingSink2 extends TwoPhaseCommitSinkFunction
>
> user could add his own hooks by overriding following methods
>
> BucketingSink2#beginTransaction,
> BucketingSink2#preCommit, BucketingSink2#commit, BucketingSink2#abort. For
> example:
>
> public class MyBucketingSink extends BucketingSink2 {
>   @Override
>   protected void  commit(??? txn) {
> super.commit(txn);
> // My hook on moving file from pending to commit state
>   };
> }
>
> Alternatively, we could implement before mentioned callbacks support in
> TwoPhaseCommitSinkFunction and provide such feature to
> Kafka/Pravega/BucketingSink at once.
>
> Piotrek
>
>
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
>
> CleverDATA
> make your data clever
>
>
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
>
> CleverDATA
> make your data clever
>
>


Re: User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
Meanwhile, I can see below code in flink 1.5

public static final ConfigOption CLASSPATH_INCLUDE_USER_JAR =
key("yarn.per-job-cluster.include-user-jar")
.defaultValue("ORDER")
.withDescription("Defines whether user-jars are 
included in the system
class path for per-job-clusters as" +
" well as their positioning in the path. They 
can be positioned at the
beginning (\"FIRST\"), at the" +
" end (\"LAST\"), or be positioned based on 
their name (\"ORDER\").
Setting this parameter to" +
" \"DISABLED\" causes the jar to be included in 
the user class path
instead.");

Does this mean the user jar should always be included in class path?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: User jar is present in the flink job manager's class path

2018-10-11 Thread Timo Walther
Yes, you are right. I was not aware that the resolution order depends on 
the cluster deployment. I will loop in Gary (in CC) that might know 
about such a YARN setup.


Regards,
Timo

Am 11.10.18 um 15:47 schrieb yinhua.dai:

Hi Timo,

I didn't tried to configure the classloader order, according to the
document, it should only be needed for yarn-session mode, right?

I can see the ship files(-yt /path/dir/) is present in job manager's class
path, so maybe I should put my uber jar in the -yt path so that it will be
shipped and add to class path in flink 1.5?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
Hi Timo,

I didn't tried to configure the classloader order, according to the
document, it should only be needed for yarn-session mode, right?

I can see the ship files(-yt /path/dir/) is present in job manager's class
path, so maybe I should put my uber jar in the -yt path so that it will be
shipped and add to class path in flink 1.5?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: User jar is present in the flink job manager's class path

2018-10-11 Thread Timo Walther

Hi,

did you try to change the classloading strategy? Maybe this problem 
could be fixed by configuring the ClassLoader resolution order [1].


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/debugging_classloading.html


Am 11.10.18 um 10:49 schrieb yinhua.dai:

We have some customized log4j layout implementation so we need flink job
manager/task manager be able to load the logger implementation which is
packaged in the uber jar.

However, we noticed that in flink 1.3, the user jar is put at the beginning
of job manager, when we do the same again in flink 1.5, the user jar is not
there any more.
Is this expected?

I saw this is the document:
*When submitting a Flink job/application directly to YARN (via bin/flink run
-m yarn-cluster ...), dedicated TaskManagers and JobManagers are started for
that job. Those JVMs have both Flink framework classes and user code classes
in the Java classpath. That means that there is no dynamic classloading
involved in that case.*

And we are using flink on yarn with per-job mode.
So confused by what we experiencing for now.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: getRuntimeContext(): The runtime context has not been initialized.

2018-10-11 Thread Ahmad Hassan
Hi,

Yes we can replace foldfunction with aggregatefunction, not an issue. But
the problem remains the same, how to use mapstate to store and update state
of each product instead of keeping whole HashMap of products on heap
memory. We are running flink 1.6.0.

Yes we can see up to 24millions products in 24 hr window. The composite key
 will cause millions of windows in 24 hr for 24million
products for just one tenant. That is why we chosse tenant as key and then
use map to store products metrics for incoming events.

Any known design how to deal this in flink please?

Thanks.
Best regards



On Thu, 11 Oct 2018 at 12:14, Dawid Wysakowicz 
wrote:

> Hi Ahmad,
>
> Few comments from my side:
>
> 1. FoldFunction is deprecated because of many problems, e.g. no
> possibility to merge contents of windows. Therefore you should at least use
> the AggregateFunction.
>
> 2. I am not sure if you need to store this in RocksDB, do you expect
> 24millions product per each tenant in a single window?
>
> 3. I think what you could do is first compute stats for composite key
>  and then aggregate them in subsequent operation(if you
> need to). This way you could distribute the workload to more parallel
> instances.
>
> Best,
>
> Dawid
>
> On 11/10/18 11:33, Ahmad Hassan wrote:
>
> Hi All,
>
> Thanks for the replies. Here is the code snippet of what we want to
> achieve:
>
> We have sliding windows of 24hrs with 5 minutes apart.
>
> inStream
>  .filter(Objects::nonNull)
>  .keyBy("tenant")
>  .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
> Time.minutes(5)))
>  .fold(new DefaultVector(), new CalculationFold(), new
> MetricCalculationApply());
>
> public class CalculationFold implements FoldFunction
> {
> private final MapState products;
> private transient MapStateDescriptor
> descr;
>
> @Override
> public DefaultVector fold(DefaultVector stats, Event event)
> {
> if (products.contains(event.getProductId))
> {
> DefaultProductMetricVector product = products.get(event.getProductId);
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> else
> {
> DefaultProductMetricVector product = new DefaultProductMetricVector();
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> return stats;
> }
>
> *// Fold function do not allow the open method and
> this.getRuntimeContext*
> //public void open(Configuration parameters) throws Exception
> //{
> // descr = new MapStateDescriptor<>("product", String.class,
> DefaultProductMetricVector.class);
> // products = this.getRuntimeContext().getMapState(descr);
> //}
> }
>
>
> We expect millions of unique products in 24 hour window so that is the
> reason we want to store state on rocksdb of each product class
> DefaultProductMetricVector instance. Otherwise, my understanding is that is
> that if i instantiate a java hashmap of products within DefaultVector fold
> accumulator then for each incoming event the full set of products will be
> deserialised and stored on heap which will eventually cause heap overflow
> error.
>
> Please can you tell us how to solve this problem.
>
> Thanks.
>
> Best Regards,
>
>
> On Wed, 10 Oct 2018 at 10:21, Fabian Hueske  wrote:
>
>> Yes, it would be good to post your code.
>> Are you using a FoldFunction in a window (if yes, what window) or as a
>> running aggregate?
>>
>> In general, collecting state in a FoldFunction is usually not something
>> that you should do. Did you consider using an AggregateFunction?
>>
>> Fabian
>>
>> Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler <
>> ches...@apache.org>:
>>
>>> In which method are you calling getRuntimeContext()? This method can
>>> only be used after open() has been called.
>>>
>>> On 09.10.2018 17:09, Ahmad Hassan wrote:
>>>
>>> Hi,
>>>
>>> We want to use MapState inside fold function to keep the map of all
>>> products that we see in 24 hour window to store huge state in rocksdb
>>> rather than overflowing heap. However, I don't seem to initialise mapstate
>>> within foldfunction or any class that is extending RichMapFunction
>>>
>>> private transient MapStateDescriptor descr = new
>>> MapStateDescriptor<>("mymap", String.class, String.class);
>>> this.getRuntimeContext().getMapState(descr);
>>>
>>> I get error
>>>
>>> java.lang.IllegalStateException: The runtime context has not been
>>> initialized.
>>> at
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>>
>>>
>>> Any clues how to get the runtime context please?
>>>
>>> Thanks.
>>>
>>> Best regards
>>>
>>>
>>>
>


Re: [BucketingSink] notify on moving into pending/ final state

2018-10-11 Thread Rinat
Hi Piotr, during the migration to the latest Flink version, we’ve decided to 
try to contribute this functionality to the master branch.

PR is available here https://github.com/apache/flink/pull/6824 
More details about hooking the state changes in BucketingSink are available in 
https://issues.apache.org/jira/browse/FLINK-9592 

Thx !

> On 14 Jun 2018, at 23:29, Rinat  wrote:
> 
> Hi Piotr, I’ve create an issue 
> https://issues.apache.org/jira/browse/FLINK-9592 
> 
> 
> The third proposal looks great, may I try to contribute this issue ?
> 
>> On 14 Jun 2018, at 12:29, Piotr Nowojski > > wrote:
>> 
>> Hi,
>> 
>> Couple of things:
>> 
>> 1. Please create a Jira ticket with this proposal, so we can move discussion 
>> from user mailing list.
>> 
>> I haven’t thought it through, so take my comments with a grain of salt, 
>> however:
>> 
>> 2. If we were to go with such callback, I would prefer to have one 
>> BucketStateChangeCallback, with methods like `onInProgressToPending(…)`, 
>> `onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one 
>> interface passed three times/four times for different purposes.
>> 
>> 3. Other thing that I had in mind is that BucketingSink could be rewritten 
>> to extend TwoPhaseCommitSinkFunction. In that case, with 
>> 
>> public class BucketingSink2 extends TwoPhaseCommitSinkFunction
>> 
>> user could add his own hooks by overriding following methods
>> 
>> BucketingSink2#beginTransaction, BucketingSink2#preCommit, 
>> BucketingSink2#commit, BucketingSink2#abort. For example:
>> 
>> public class MyBucketingSink extends BucketingSink2 {
>>   @Override
>>   protected void  commit(??? txn) {
>> super.commit(txn);
>> // My hook on moving file from pending to commit state
>>   };
>> }
>> 
>> Alternatively, we could implement before mentioned callbacks support in 
>> TwoPhaseCommitSinkFunction and provide such feature to 
>> Kafka/Pravega/BucketingSink at once.
>> 
>> Piotrek
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: Small checkpoint data takes too much time

2018-10-11 Thread 徐涛
Hi Zhijiang,
Thanks for your response.
I add the checkpointAlignmentTime, the data shows that the 
checkpointDuration is about 150s, and the checkpointAlignmentTims is about 4s. 
There is a big gap between them.

Best
Henry

> 在 2018年10月10日,下午1:26,Zhijiang(wangzhijiang999)  
> 写道:
> 
> The checkpoint duration includes the processes of barrier alignment and state 
> snapshot. Every task has to receive all the barriers from all the channels, 
> then trriger to snapshot state.
> I guess the barrier alignment may take long time for your case, and it is 
> specially critical during backpressure. You can check the metric of 
> "checkpointAlignmentTime" for confirmation.
> 
> Best,
> Zhijiang
> --
> 发件人:徐涛 
> 发送时间:2018年10月10日(星期三) 13:13
> 收件人:user 
> 主 题:Small checkpoint data takes too much time
> 
> Hi 
>  I recently encounter a problem in production. I found checkpoint takes too 
> much time, although it doesn`t affect the job execution.
>  I am using FsStateBackend, writing the data to a HDFS checkpointDataUri, and 
> asynchronousSnapshots, I print the metric data “lastCheckpointDuration” and 
> “lastCheckpointSize”. It shows the “lastCheckpointSize” is about 80KB, but 
> the “lastCheckpointDuration” is about 160s! Because checkpoint data is small 
> , I think it should not take that long time. I do not know why and which 
> condition may influent the checkpoint time. Does anyone has encounter such 
> problem?
>  Thanks a lot.
> 
> Best
> Henry
> 



Re: getRuntimeContext(): The runtime context has not been initialized.

2018-10-11 Thread Dawid Wysakowicz
Hi Ahmad,

Few comments from my side:

    1. FoldFunction is deprecated because of many problems, e.g. no
possibility to merge contents of windows. Therefore you should at least
use the AggregateFunction.

    2. I am not sure if you need to store this in RocksDB, do you expect
24millions product per each tenant in a single window?

    3. I think what you could do is first compute stats for composite
key  and then aggregate them in subsequent operation(if
you need to). This way you could distribute the workload to more
parallel instances.

Best,

Dawid


On 11/10/18 11:33, Ahmad Hassan wrote:
> Hi All,
>
> Thanks for the replies. Here is the code snippet of what we want to
> achieve:
>
> We have sliding windows of 24hrs with 5 minutes apart.
>
> inStream
>  .filter(Objects::nonNull)
>  .keyBy("tenant")
>  .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
> Time.minutes(5)))
>  .fold(new DefaultVector(), new CalculationFold(), new
> MetricCalculationApply());
>
> public class CalculationFold implements FoldFunction
> {
> private final MapState products;
> private transient MapStateDescriptor DefaultProductMetricVector> descr;
>
> @Override
> public DefaultVector fold(DefaultVector stats, Event event)
> {
> if (products.contains(event.getProductId))
> {
> DefaultProductMetricVector product = products.get(event.getProductId);
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> else
> {
> DefaultProductMetricVector product = new DefaultProductMetricVector();
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> return stats;
> }
>
> *        // Fold function do not allow the open method and
> this.getRuntimeContext*
> //public void open(Configuration parameters) throws Exception
> //{
> // descr = new MapStateDescriptor<>("product", String.class,
> DefaultProductMetricVector.class);
> // products = this.getRuntimeContext().getMapState(descr);
> //}
> }
>
>
> We expect millions of unique products in 24 hour window so that is the
> reason we want to store state on rocksdb of each product class
> DefaultProductMetricVector instance. Otherwise, my understanding is
> that is that if i instantiate a java hashmap of products within
> DefaultVector fold accumulator then for each incoming event the full
> set of products will be deserialised and stored on heap which will
> eventually cause heap overflow error.
>
> Please can you tell us how to solve this problem.
>
> Thanks.
>
> Best Regards,
>
>
> On Wed, 10 Oct 2018 at 10:21, Fabian Hueske  > wrote:
>
> Yes, it would be good to post your code.
> Are you using a FoldFunction in a window (if yes, what window) or
> as a running aggregate?
>
> In general, collecting state in a FoldFunction is usually not
> something that you should do. Did you consider using an
> AggregateFunction?
>
> Fabian
>
> Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler
> mailto:ches...@apache.org>>:
>
> In which method are you calling getRuntimeContext()? This
> method can only be used after open() has been called.
>
> On 09.10.2018 17:09, Ahmad Hassan wrote:
>> Hi,
>>
>> We want to use MapState inside fold function to keep the map
>> of all products that we see in 24 hour window to store huge
>> state in rocksdb rather than overflowing heap. However, I
>> don't seem to initialise mapstate within foldfunction or any
>> class that is extending RichMapFunction
>>
>> private transient MapStateDescriptor descr =
>> new MapStateDescriptor<>("mymap", String.class, String.class);
>> this.getRuntimeContext().getMapState(descr);
>>
>> I get error
>>
>> java.lang.IllegalStateException: The runtime context has not
>> been initialized.
>> at
>> 
>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>
>>
>> Any clues how to get the runtime context please?
>>
>> Thanks.
>>
>> Best regards
>
>



signature.asc
Description: OpenPGP digital signature


Re: Identifying missing events in keyed streams

2018-10-11 Thread Fabian Hueske
I'd go with 2) because the logic is simple and it is (IMO) much easier to
understand what is going on and what state is kept.

Am Do., 11. Okt. 2018 um 12:42 Uhr schrieb Averell :

> Hi Fabian,
>
> Thanks for the suggestion.
> I will try with that support of removing timers.
>
> I have also tried approach (3) - using session windows, and it works: I set
> session gap to 2 minutes, and use an aggregation window function to keep
> the
> amount of in-memory data for each keyed stream to the minimum level.
>
> Could you please explain why (2) is better?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Apache Flink: Kafka connector in Python streaming API, “Cannot load user class”

2018-10-11 Thread Dawid Wysakowicz
Hi Kostas,

As far as I know you cannot just use java classes from within python
API. I think Python API does not provide wrapper for kafka connector. I
am adding Chesnay to cc to correct me if I am wrong.

Best,

Dawid


On 11/10/18 12:18, Kostas Evangelou wrote:
> Hey all, 
>
> Thank you so much for your efforts. I've already posted this question
> on stack overflow, but thought I should ask here as well.
>
> I am trying out Flink's new Python streaming API and attempting to run
> my script with |./flink-1.6.1/bin/pyflink-stream.sh
> examples/read_from_kafka.py|. The python script is fairly
> straightforward, I am just trying to consume from an existing topic
> and send everything to stdout (or the *.out file in the log directory
> where the output method emits data by default).
>
> import glob
>
> import os
>
> import sys
>
> from java.util import Properties
>
> from org.apache.flink.streaming.api.functions.source import SourceFunction
>
> from org.apache.flink.streaming.api.collector.selector import
> OutputSelector
>
> from org.apache.flink.api.common.serialization import SimpleStringSchema
>
>
> directories=['/home/user/flink/flink-1.6.1/lib']
>
> for directory in directories:
>
>     for jar in glob.glob(os.path.join(directory,'*.jar')):
>
>                 sys.path.append(jar)
>
>
> from org.apache.flink.streaming.connectors.kafka import
> FlinkKafkaConsumer09
>
>
> props = Properties()
>
> config = {"bootstrap_servers": "localhost:9092",
>
>           "group_id": "flink_test",
>
>           "topics": ["TopicCategory-TopicName"]}
>
> props.setProperty("bootstrap.servers", config['bootstrap_servers'])
>
> props.setProperty("group_id", config['group_id'])
>
> props.setProperty("zookeeper.connect", "localhost:2181")
>
>
> def main(factory):
>
>     consumer = FlinkKafkaConsumer09([config["topics"]],
> SimpleStringSchema(), props)
>
>
>     env = factory.get_execution_environment()
>
>     env.add_java_source(consumer) \
>
>         .output()
>
>     env.execute()
>
>
> I grabbed a handful of jar files from the maven repos,
> namely |flink-connector-kafka-0.9_2.11-1.6.1.jar|, 
> |flink-connector-kafka-base_2.11-1.6.1.jar| and |kafka-clients-0.9.0.1.jar|and
> copied them in Flink's |lib| directory. Unless I misunderstood the
> documentation, this should suffice for Flink to load the kafka
> connector. Indeed, if I remove any of these jars the import fails, but
> this doesn't seem to be enough to actually invoke the plan. Adding a
> for loop to dynamically add these to |sys.path| didn't work either.
> Here's what gets printed in the console:
>
> Starting execution of program
>
> Failed to run plan: null
>
> Traceback (most recent call last):
>
>   File "", line 1, in 
>
>   File
> "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py",
> line 32, in main
>
>     at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
>
>     at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
>
>     at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>
>     at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>
>     at
> org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
>
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>     at java.lang.reflect.Method.invoke(Method.java:498)
>
>
> org.apache.flink.client.program.ProgramInvocationException:
> org.apache.flink.client.program.ProgramInvocationException: Job
> failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)
>
>
> The program didn't contain a Flink job. Perhaps you forgot to call
> execute() on the execution environment.
>
>
> This is what I see in the logs:
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> load user class:   
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
>
> ClassLoader info: URL ClassLoader:
>
>     file:
> '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887'
> (valid JAR)
>
> Class not resolvable through given classloader.
>
>     at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
>
>     at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)
>
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>     at java.lang.Thread.run(Thread.java:748)
>
>
> Is there a way to fix this and make the c

Re: Identifying missing events in keyed streams

2018-10-11 Thread Averell
Hi Fabian,

Thanks for the suggestion.
I will try with that support of removing timers.

I have also tried approach (3) - using session windows, and it works: I set
session gap to 2 minutes, and use an aggregation window function to keep the
amount of in-memory data for each keyed stream to the minimum level.

Could you please explain why (2) is better?

Thanks and best regards,
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Apache Flink: Kafka connector in Python streaming API, “Cannot load user class”

2018-10-11 Thread Kostas Evangelou
Hey all,

Thank you so much for your efforts. I've already posted this question on
stack overflow, but thought I should ask here as well.

I am trying out Flink's new Python streaming API and attempting to run my
script with ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py.
The python script is fairly straightforward, I am just trying to consume
from an existing topic and send everything to stdout (or the *.out file in
the log directory where the output method emits data by default).

import glob

import os

import sys

from java.util import Properties

from org.apache.flink.streaming.api.functions.source import SourceFunction

from org.apache.flink.streaming.api.collector.selector import OutputSelector

from org.apache.flink.api.common.serialization import SimpleStringSchema


directories=['/home/user/flink/flink-1.6.1/lib']

for directory in directories:

for jar in glob.glob(os.path.join(directory,'*.jar')):

sys.path.append(jar)


from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09


props = Properties()

config = {"bootstrap_servers": "localhost:9092",

  "group_id": "flink_test",

  "topics": ["TopicCategory-TopicName"]}

props.setProperty("bootstrap.servers", config['bootstrap_servers'])

props.setProperty("group_id", config['group_id'])

props.setProperty("zookeeper.connect", "localhost:2181")


def main(factory):

consumer = FlinkKafkaConsumer09([config["topics"]],
SimpleStringSchema(), props)


env = factory.get_execution_environment()

env.add_java_source(consumer) \

.output()

env.execute()

I grabbed a handful of jar files from the maven repos, namely
flink-connector-kafka-0.9_2.11-1.6.1.jar,
flink-connector-kafka-base_2.11-1.6.1.jar and kafka-clients-0.9.0.1.jarand
copied them in Flink's lib directory. Unless I misunderstood the
documentation, this should suffice for Flink to load the kafka connector.
Indeed, if I remove any of these jars the import fails, but this doesn't
seem to be enough to actually invoke the plan. Adding a for loop to
dynamically add these to sys.path didn't work either. Here's what gets
printed in the console:

Starting execution of program

Failed to run plan: null

Traceback (most recent call last):

  File "", line 1, in 

  File
"/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py",
line 32, in main

at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)

at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)

at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)

at
org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)


org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: bbcc0cb2c4fe6e3012d228b06b270eba)


The program didn't contain a Flink job. Perhaps you forgot to call
execute() on the execution environment.

This is what I see in the logs:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

ClassLoader info: URL ClassLoader:

file:
'/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887'
(valid JAR)

Class not resolvable through given classloader.

at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:104)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

at java.lang.Thread.run(Thread.java:748)

Is there a way to fix this and make the connector available to Python?

Many thanks,
Kostas


Re: Taskmanager times out continuously for registration with Jobmanager

2018-10-11 Thread Till Rohrmann
Hi Abdul,

have you tried whether this problem also occurs with newer Flink versions
(1.5.4 or 1.6.1)?

Cheers,
Till

On Thu, Oct 11, 2018 at 9:24 AM Dawid Wysakowicz 
wrote:

> Hi Abdul,
>
> I've added Till and Gary to cc, who might be able to help you.
>
> Best,
>
> Dawid
>
> On 11/10/18 03:05, Abdul Qadeer wrote:
>
> Hi,
>
>
> We are facing an issue in standalone HA mode in Flink 1.4.0 where
> Taskmanager restarts and is not able to register with the Jobmanager. It
> times out awaiting *AcknowledgeRegistration/AlreadyRegistered* message
> from Jobmanager Actor and keeps sending *RegisterTaskManager *message.
> The logs at Jobmanager don’t show anything about registration
> failure/request. It doesn’t print *log*.debug(*s"RegisterTaskManager: $*
> msg*"*) (from JobManager.scala) either. The network connection between
> taskmanager and jobmanager seems fine; tcpdump shows message sent to
> jobmanager and TCP ACK received from jobmanager. Note that the
> communication is happening between docker containers.
>
>
> Following are the logs from Taskmanager:
>
>
>
> {"timeMillis":1539189572438,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
> to register at JobManager akka.tcp://
> flink@192.168.83.51:6123/user/jobmanager (attempt 1400, timeout: 3
> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>
> {"timeMillis":1539189580229,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
> ping response for sessionid: 0x1260ea5002d after
> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>
> {"timeMillis":1539189600247,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
> ping response for sessionid: 0x1260ea5002d after
> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>
> {"timeMillis":1539189602458,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
> to register at JobManager akka.tcp://
> flink@192.168.83.51:6123/user/jobmanager (attempt 1401, timeout: 3
> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>
> {"timeMillis":1539189620251,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
> ping response for sessionid: 0x1260ea5002d after
> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>
> {"timeMillis":1539189632478,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
> to register at JobManager akka.tcp://
> flink@192.168.83.51:6123/user/jobmanager (attempt 1402, timeout: 3
> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>
>
>


Re: getRuntimeContext(): The runtime context has not been initialized.

2018-10-11 Thread Ahmad Hassan
Hi All,

Thanks for the replies. Here is the code snippet of what we want to achieve:

We have sliding windows of 24hrs with 5 minutes apart.

inStream
 .filter(Objects::nonNull)
 .keyBy("tenant")
 .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
Time.minutes(5)))
 .fold(new DefaultVector(), new CalculationFold(), new
MetricCalculationApply());

public class CalculationFold implements FoldFunction
{
private final MapState products;
private transient MapStateDescriptor
descr;

@Override
public DefaultVector fold(DefaultVector stats, Event event)
{
if (products.contains(event.getProductId))
{
DefaultProductMetricVector product = products.get(event.getProductId);
product.updatePrice(event.getPrice);
products.put(event.getProductId, product);
}
else
{
DefaultProductMetricVector product = new DefaultProductMetricVector();
product.updatePrice(event.getPrice);
products.put(event.getProductId, product);
}
return stats;
}

*// Fold function do not allow the open method and
this.getRuntimeContext*
//public void open(Configuration parameters) throws Exception
//{
// descr = new MapStateDescriptor<>("product", String.class,
DefaultProductMetricVector.class);
// products = this.getRuntimeContext().getMapState(descr);
//}
}


We expect millions of unique products in 24 hour window so that is the
reason we want to store state on rocksdb of each product class
DefaultProductMetricVector instance. Otherwise, my understanding is that is
that if i instantiate a java hashmap of products within DefaultVector fold
accumulator then for each incoming event the full set of products will be
deserialised and stored on heap which will eventually cause heap overflow
error.

Please can you tell us how to solve this problem.

Thanks.

Best Regards,


On Wed, 10 Oct 2018 at 10:21, Fabian Hueske  wrote:

> Yes, it would be good to post your code.
> Are you using a FoldFunction in a window (if yes, what window) or as a
> running aggregate?
>
> In general, collecting state in a FoldFunction is usually not something
> that you should do. Did you consider using an AggregateFunction?
>
> Fabian
>
> Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler <
> ches...@apache.org>:
>
>> In which method are you calling getRuntimeContext()? This method can only
>> be used after open() has been called.
>>
>> On 09.10.2018 17:09, Ahmad Hassan wrote:
>>
>> Hi,
>>
>> We want to use MapState inside fold function to keep the map of all
>> products that we see in 24 hour window to store huge state in rocksdb
>> rather than overflowing heap. However, I don't seem to initialise mapstate
>> within foldfunction or any class that is extending RichMapFunction
>>
>> private transient MapStateDescriptor descr = new
>> MapStateDescriptor<>("mymap", String.class, String.class);
>> this.getRuntimeContext().getMapState(descr);
>>
>> I get error
>>
>> java.lang.IllegalStateException: The runtime context has not been
>> initialized.
>> at
>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>
>>
>> Any clues how to get the runtime context please?
>>
>> Thanks.
>>
>> Best regards
>>
>>
>>


回复:What are channels mapped to?

2018-10-11 Thread Zhijiang(wangzhijiang999)
The channels are mapped to the subpartition index which would be consumed by 
specific downstream task parallelism.

For example, if there are three reduce tasks parallelism, every map task would 
generate three subpartitions. If one record is hashed to the first channel, 
that means this record will be consumed by the first reduce task.

Best,
Zhijiang
--
发件人:Chris Miller 
发送时间:2018年10月11日(星期四) 16:54
收件人:user 
主 题:What are channels mapped to?

Hi,
in the OutputEmitter, the output channel can be selected in different manner. 
eg. OutputEmitter#hashPartitionDefault()
What are the channels mapped to? Do they map to one IP Address or Port?
Thanks.
Chris



What are channels mapped to?

2018-10-11 Thread Chris Miller
 

Hi, 

in the OutputEmitter, the output channel can be selected in different
manner. 

eg. OutputEmitter#hashPartitionDefault() 

What are the channels mapped to? Do they map to one IP Address or Port? 

Thanks. 

Chris 

 

User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
We have some customized log4j layout implementation so we need flink job
manager/task manager be able to load the logger implementation which is
packaged in the uber jar.

However, we noticed that in flink 1.3, the user jar is put at the beginning
of job manager, when we do the same again in flink 1.5, the user jar is not
there any more.
Is this expected?

I saw this is the document:
*When submitting a Flink job/application directly to YARN (via bin/flink run
-m yarn-cluster ...), dedicated TaskManagers and JobManagers are started for
that job. Those JVMs have both Flink framework classes and user code classes
in the Java classpath. That means that there is no dynamic classloading
involved in that case.*

And we are using flink on yarn with per-job mode.
So confused by what we experiencing for now.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink leaves a lot RocksDB sst files in tmp directory

2018-10-11 Thread Sayat Satybaldiyev
Thank you Piotr for the reply! We didn't run this job on the previous
version of Flink. Unfortunately, I don't have a log file from JM only TM
logs.

https://drive.google.com/file/d/14QSVeS4c0EETT6ibK3m_TMgdLUwD6H1m/view?usp=sharing

On Wed, Oct 10, 2018 at 10:08 AM Piotr Nowojski 
wrote:

> Hi,
>
> Was this happening in older Flink version? Could you post in what
> circumstances the job has been moved to a new TM (full job manager logs and
> task manager logs would be helpful)? I’m suspecting that those leftover
> files might have something to do with local recovery.
>
> Piotrek
>
> On 9 Oct 2018, at 15:28, Sayat Satybaldiyev  wrote:
>
> After digging more in the log, I think it's more a bug. I've greped a log
> by job id and found under normal circumstances TM supposed to delete
> flink-io files. For some reason, it doesn't delete files that were listed
> above.
>
> 2018-10-08 22:10:25,865 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_bf69685b-78d3-431c-88be-b3f26db05566.
> 2018-10-08 22:10:25,867 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_14630a50145935222dbee3f1bcfdc2a6__1_1__uuid_47cd6e95-144a-4c52-a905-52966a5e9381.
> 2018-10-08 22:10:25,874 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_7c539a96-a247-4299-b1a0-01df713c3c34.
> 2018-10-08 22:17:38,680 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
> JobManager connection for job a5b223c7aee89845f9aed24012e46b7e.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 2018-10-08 22:17:38,686 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_7185aa35d035b12c70cf490077378540__1_1__uuid_2e88c56a-2fc2-41f2-a1b9-3b0594f660fb.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> 2018-10-08 22:17:38,691 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  -
> Deleting existing instance base directory
> /tmp/flink-io-5eb5cae3-b194-40b2-820e-01f8f39b5bf6/job_a5b223c7aee89845f9aed24012e46b7e_op_StreamSink_92266bd138cd7d51ac7a63beeb86d5f5__1_1__uuid_b44aecb7-ba16-4aa4-b709-31dae7f58de9.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
> org.apache.flink.util.FlinkException: JobManager responsible for
> a5b223c7aee89845f9aed24012e46b7e lost the leadership.
>
>
> On Tue, Oct 9, 2018 at 2:33 PM Sayat Satybaldiyev 
> wrote:
>
>> Dear all,
>>
>> While running Flink 1.6.1 with RocksDB as a backend and hdfs as
>> checkpoint FS, I've noticed that after a job has moved to a different host
>> it leaves quite a huge state in temp folder(1.2TB in total). The files are
>> not used as TM is not running a job on the current host.
>>
>> The job a5b223c7aee89845f9aed24012e46b7e had been running on the host but
>> then it was moved to a different TM. I'm wondering is it intended
>> behavior or a possible bug?
>>
>> I've attached files that are left and not used by a job in PrintScreen.
>>
>
>


Re: Streaming to Parquet Files in HDFS

2018-10-11 Thread Averell
Hi Kostas,

Thanks for the info. That error caused by I built your code along with not
up-to-date baseline. I rebased my branch build, and there's no more such
issue.
I've been testing, and until now have some questions/issues as below:

1. I'm not able to write to S3 with the following URI format: *s3*://,
and had to use *s3a*://. Is this behaviour expected? (I am running
Flink on AWS EMR, and I thought that EMR provides a wrapper for HDFS over S3
with something called EMRFS).

2. Occasionally/randomly I got the below message ( parquet_error1.log

 
). I'm using ParquetAvroWriters.forReflectRecord() method to write Scala
case classes. Re-running the job doesn't get that error at the same data
location, so I don't think that there's issue with data.
 *java.lang.ArrayIndexOutOfBoundsException: * /at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.fallBackDictionaryEncodedData/.
 

3. Sometimes I got this error message when I use parallelism of 8 for the
sink ( parquet_error2.log

 
).
Reducing to 2 solves the issue. But is it possible to increase the pool
size? I could not find any place that I can change the
/fs.s3.maxconnections/ parameter.
/java.io.InterruptedIOException: initiate MultiPartUpload on
Test/output/dt=2018-09-20/part-7-5:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
to execute HTTP request: Timeout waiting for connection from pool/

4. Where is the temporary folder that you store the parquet file before
uploading to S3?

Thanks a lot for your help.

Best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Timo Walther

Hi Xuefu,

thanks for your proposal, it is a nice summary. Here are my thoughts to 
your list:


1. I think this is also on our current mid-term roadmap. Flink lacks a 
poper catalog support for a very long time. Before we can connect 
catalogs we need to define how to map all the information from a catalog 
to Flink's representation. This is why the work on the unified connector 
API [1] is going on for quite some time as it is the first approach to 
discuss and represent the pure characteristics of connectors.
2. It would be helpful to figure out what is missing in [1] to to ensure 
this point. I guess we will need a new design document just for a proper 
Hive catalog integration.
3. This is already work in progress. ORC has been merged, Parquet is on 
its way [1].
4. This should be easy. There was a PR in past that I reviewed but was 
not maintained anymore.
5. The type system of Flink SQL is very flexible. Only UNION type is 
missing.
6. A Flink SQL DDL is on the roadmap soon once we are done with [1]. 
Support for Hive syntax also needs cooperation with Apache Calcite.

7-11. Long-term goals.

I would also propose to start with a smaller scope where also current 
Flink SQL users can profit: 1, 2, 5, 3. This would allow to grow the 
Flink SQL ecosystem. After that we can aim to be fully compatible 
including syntax and UDFs (4, 6 etc.). Once the core is ready, we can 
work on the tooling (7, 8, 9) and performance (10, 11).


@Jörn: Yes, we should not have a tight dependency on Hive. It should be 
treated as one "connector" system out of many.


Thanks,
Timo

[1] 
https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#

[2] https://github.com/apache/flink/pull/6483

Am 11.10.18 um 07:54 schrieb Jörn Franke:

Would it maybe make sense to provide Flink as an engine on Hive 
(„flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely 
coupled than integrating hive in all possible flink core modules and thus 
introducing a very tight dependency to Hive in the core.
1,2,3 could be achieved via a connector based on the Flink Table API.
Just as a proposal to start this Endeavour as independent projects (hive 
engine, connector) to avoid too tight coupling with Flink. Maybe in a more 
distant future if the Hive integration is heavily demanded one could then 
integrate it more tightly if needed.

What is meant by 11?

Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :

Hi Fabian/Vno,

Thank you very much for your encouragement inquiry. Sorry that I didn't see 
Fabian's email until I read Vino's response just now. (Somehow Fabian's went to 
the spam folder.)

My proposal contains long-term and short-terms goals. Nevertheless, the effort 
will focus on the following areas, including Fabian's list:

1. Hive metastore connectivity - This covers both read/write access, which 
means Flink can make full use of Hive's metastore as its catalog (at least for 
the batch but can extend for streaming as well).
2. Metadata compatibility - Objects (databases, tables, partitions, etc) 
created by Hive can be understood by Flink and the reverse direction is true 
also.
3. Data compatibility - Similar to #2, data produced by Hive can be consumed by 
Flink and vise versa.
4. Support Hive UDFs - For all Hive's native udfs, Flink either provides its 
own implementation or make Hive's implementation work in Flink. Further, for 
user created UDFs in Hive, Flink SQL should provide a mechanism allowing user 
to import them into Flink without any code change required.
5. Data types -  Flink SQL should support all data types that are available in 
Hive.
6. SQL Language - Flink SQL should support SQL standard (such as SQL2003) with 
extension to support Hive's syntax and language features, around DDL, DML, and 
SELECT queries.
7.  SQL CLI - this is currently developing in Flink but more effort is needed.
8. Server - provide a server that's compatible with Hive's HiverServer2 in 
thrift APIs, such that HiveServer2 users can reuse their existing client (such 
as beeline) but connect to Flink's thrift server instead.
9. JDBC/ODBC drivers - Flink may provide its own JDBC/ODBC drivers for other 
application to use to connect to its thrift server
10. Support other user's customizations in Hive, such as Hive Serdes, storage 
handlers, etc.
11. Better task failure tolerance and task scheduling at Flink runtime.

As you can see, achieving all those requires significant effort and across all 
layers in Flink. However, a short-term goal could  include only core areas 
(such as 1, 2, 4, 5, 6, 7) or start  at a smaller scope (such as #3, #6).

Please share your further thoughts. If we generally agree that this is the 
right direction, I could come up with a formal proposal quickly and then we can 
follow up with broader discussions.

Thanks,
Xuefu



--
Sender:vino yang 
Sent at:2018 Oct 11 (Thu) 09:45
Recipient:Fabian Hueske 
Cc:dev ; Xu

Re: Partitions vs. Subpartitions

2018-10-11 Thread Fabian Hueske
Hi Chris,

The terminology in the docs and code is not always consistent. It depends
on the context.
Both could also mean the same if they are used in different places.

Can you point to the place(s) that refer to partition and subpartition?

Fabian

Am Do., 11. Okt. 2018 um 04:50 Uhr schrieb Kurt Young :

> Hi,
>
> Partition is the output of a JobVertex which you can simply thought
> contains an operator. And in real world, JobVertex will run in parallel,
> each
> will output some data, which is conceptually called subpartition.
>
> Best,
> Kurt
>
>
> On Thu, Oct 11, 2018 at 10:27 AM Renjie Liu 
> wrote:
>
>> Hi, Chris:
>>
>> Where are these words from? Are they from flink source code?
>>
>> On Wed, Oct 10, 2018 at 10:18 PM Chris Miller  wrote:
>>
>>> Hi,
>>>
>>> what's the difference between partitions and subpartitions?
>>>
>>>
>>>
>>> Thanks.
>>>
>>>
>>>
>>> CM
>>>
>>>
>>
>>
>> --
>> Renjie Liu
>> Software Engineer, MVAD
>>
>


Re: Taskmanager times out continuously for registration with Jobmanager

2018-10-11 Thread Dawid Wysakowicz
Hi Abdul,

I've added Till and Gary to cc, who might be able to help you.

Best,

Dawid


On 11/10/18 03:05, Abdul Qadeer wrote:
>
> Hi,
>
>
> We are facing an issue in standalone HA mode in Flink 1.4.0 where
> Taskmanager restarts and is not able to register with the Jobmanager.
> It times out awaiting /AcknowledgeRegistration/AlreadyRegistered/
> message from Jobmanager Actor and keeps sending /RegisterTaskManager
> /message. The logs at Jobmanager don’t show anything about
> registration failure/request. It doesn’t print
> /log/.debug(*s"RegisterTaskManager: $*msg*"*) (from JobManager.scala)
> either. The network connection between taskmanager and jobmanager
> seems fine; tcpdump shows message sent to jobmanager and TCP ACK
> received from jobmanager. Note that the communication is happening
> between docker containers.
>
>
> Following are the logs from Taskmanager:
>
>
>
> {"timeMillis":1539189572438,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
> to register at JobManager
> akka.tcp://flink@192.168.83.51:6123/user/jobmanager
>  (attempt 1400,
> timeout: 3
> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>
> {"timeMillis":1539189580229,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
> ping response for sessionid: 0x1260ea5002d after
> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>
> {"timeMillis":1539189600247,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
> ping response for sessionid: 0x1260ea5002d after
> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>
> {"timeMillis":1539189602458,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
> to register at JobManager
> akka.tcp://flink@192.168.83.51:6123/user/jobmanager
>  (attempt 1401,
> timeout: 3
> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>
> {"timeMillis":1539189620251,"thread":"Curator-Framework-0-SendThread(zookeeper.maglev-system.svc.cluster.local:2181)","level":"DEBUG","loggerName":"org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn","message":"Got
> ping response for sessionid: 0x1260ea5002d after
> 0ms","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":101,"threadPriority":5}
>
> {"timeMillis":1539189632478,"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
> to register at JobManager
> akka.tcp://flink@192.168.83.51:6123/user/jobmanager
>  (attempt 1402,
> timeout: 3
> milliseconds)","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":48,"threadPriority":5}
>
>



signature.asc
Description: OpenPGP digital signature


Re: 答复: No data issued by flink window after a few hours

2018-10-11 Thread Dawid Wysakowicz
Hi,

I agree with Vino, that you should check if the watermark is progressing
for all subtasks, if you are using event time semantics. If this is not
the problem it would help if you could share the code of your job. By
the way have you tried reproducing the problem with collection source?

Best,

Dawid

On 10/10/18 08:44, 潘 功森 wrote:

> Hi,
>
>  
>
> Cause default state size in one hour is too small,and the max window
> size is 24 hours, so I used 500M.
>
>  
>
> MemoryStateBackend stateBackend = new 
> MemoryStateBackend(/MAX_STATE_SIZE/);//500M env.setStateBackend(stateBackend);
>
>  
>
> And I found Irrespective of the configured maximal state size, the
> state cannot be larger than the akka frame size.
>
> So I add a config in flink-comf.yaml:
>
> akka.framesize: 524288000b
>
>  
>
> What else do I have to pay attention to?
>
>  
>
> Yours,
>
> September
>
>  
>
> 
> *发件人:* vino yang 
> *发送时间:* Wednesday, October 10, 2018 11:45:31 AM
> *收件人:* pangong...@hotmail.com
> *抄送:* user
> *主题:* Re: No data issued by flink window after a few hours
>  
> Hi,
>
> I saw the exception image you provided. Based on the exception
> message, it seems you used the default max state size (5MB).
>
> You can specify the max state size to override the default value. Try :
>
> /MemoryStateBackend stateBackend = new
> MemoryStateBackend(*theSizeOfBytes*);/
> /
> /
> Please note that you need to reserve enough memory for Flink.
>
> Thanks, vino.
>
> 潘 功森 mailto:pangong...@hotmail.com>>
> 于2018年10月10日周三 上午11:36写道:
>
> Please have a look about my last mail.
>
>  
>
> When the cached window data is too large, how?
>
>  
>
> Yours,
>
> September
>
>  
>
> 
> *发件人:* vino yang  >
> *发送时间:* Wednesday, October 10, 2018 11:33:48 AM
> *收件人:* pangong...@hotmail.com 
> *抄送:* user
> *主题:* Re: No data issued by flink window after a few hours
>  
> Hi,
>
> Did you mean "computer momery" referring to Memory Statebackend? 
> The Flink window mechanism is internally based on State, and this
> is done for fault tolerance. 
> If you introduce external storage, it will break its design and
> bring other problems.
>
> Thanks, vino.
>
> 潘 功森 mailto:pangong...@hotmail.com>>
> 于2018年10月10日周三 上午11:02写道:
>
> Hi,
>
> "ram to cache the distinct data about sliding window" means I
> used computer momery not the third part db to cache the data
> need used in window.
>
> “the data need used in window” means :such as the sliding
> window is 1 hour, and I need to count the distinct users, I
> need to cache the user id about one hour.
>
> Cause there’re no related errors.
>
> Yours,
>
> September
>
>  
>
> 
> 
> *发件人:* vino yang  >
> *发送时间:* Wednesday, October 10, 2018 10:49:43 AM
> *抄送:* user
> *主题:* Re: No data issued by flink window after a few hours
>  
> Hi,
>
> Can you explain what "ram to cache the distinct data about
> sliding window" mean? 
> The information you provide is too small and will not help
> others to help you analyze the problem and provide advice.
>
> In addition, regarding the usage of Flink related issues,
> please only send mail to the user mailing list. 
> The dev mailing list is mainly used to discuss development
> related issues.
>
> Thanks vino.
>
> ? ?? mailto:pangong...@hotmail.com>>
> 于2018年10月10日周三 上午10:37写道:
>
> Hi all,
>     I used flink window, and when the job begins, we could
> get the results of windiow.But there’re no results issued
> after a few hours.
>     I found the job is still running and no errors, and
> the data not used window all can be issued.
>     By the way, I used Flink 1.3.2 and ram to cache the
> distinct data about sliding window.
>
> Yours,
>     September
>



signature.asc
Description: OpenPGP digital signature