Re: [State Processor API] how to convert savepoint back to broadcast state

2020-01-27 Thread Yun Tang
Hi Yi

Glad to know you have already resolved it. State process API would use data 
stream API instead of data set API in the future [1].

Besides, you could also follow the guide in "the brodcast state pattern"[2]


// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor stateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint() {}));

// broadcast the rules and create the broadcast state
BroadcastStream broadcastStream = ruleStream
.broadcast(stateDescriptor);


colorPartitionedStream
 .connect(broadcastStream)
 .process(

 new KeyedBroadcastProcessFunction() {
 // my matching logic
 }
 ).uid("your-uid");

Make sure the uid and the state-name are the same with those in your savepoint, 
the CoBroadcastWithKeyedOperator would initialize the broadcast state when 
opening. [3]


[1] 
https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
[3] 
https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101


Best
Yun Tang


From: Jin Yi 
Sent: Monday, January 27, 2020 14:50
To: Yun Tang 
Cc: user ; user...@flink.apache.org 

Subject: Re: [State Processor API] how to convert savepoint back to broadcast 
state

Hi Yun,

After search around in the documentation, I tried extends 
BroadcastProcessFunction implements CheckpointedFunction. And I have 
initialized broadcast state in public void 
initializeState(FunctionInitializationContext context) method, it seems working 
fine.

Here is the doc I followed: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction

Thanks a lot for your help!
Eleanore

On Sun, Jan 26, 2020 at 6:53 PM Jin Yi 
mailto:eleanore@gmail.com>> wrote:
Hi Yun,

Thanks for the response, I have checked official document, and I have referred 
this example to write the broadcast state to a savepoint.

My question is: I can use state processor api to read back the savepoint into a 
dataSet, but how can I use the dataSet as the initial value for the broadcast 
state in the BroadcastProcessFunction.

Thanks a lot!

Eleanore

On Sun, Jan 26, 2020 at 8:53 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Yi

Can the official doc of writing broad cast state [1] satisfies your request?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1

Best
Yun Tang

From: Jin Yi mailto:eleanore@gmail.com>>
Sent: Thursday, January 23, 2020 8:12
To: user mailto:user@flink.apache.org>>; 
user...@flink.apache.org 
mailto:user...@flink.apache.org>>
Subject: [State Processor API] how to convert savepoint back to broadcast state

Hi there,

I would like to read the savepoints (for broadcast state) back into the 
broadcast state, how should I do it?


// load the existingSavepoint;
ExistingSavepoint existingSavepoint = Savepoint.load(environment, 
"file:///tmp/new_savepoints", new MemoryStateBackend());

// read state from existing savepoint
dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, 
"largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);

// TODO in BoradcastProcessFunction, how can I put the savepoint dataset back 
into BroadcastState?

Thanks!

Eleanore


Flink RocksDB logs filling up disk space

2020-01-27 Thread Ahmad Hassan
Hello,

In our production systems, we see that flink rocksdb checkpoint IO logs are
filling up disk space very very quickly in the order of GB's as the logging
is very verbose. How do we disable or suppress these logs please ? The
rocksdb file checkpoint.cc is dumping huge amount of checkpoint logs like

Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());

Best Regards,


Re: BlinkPlanner limitation related clarification

2020-01-27 Thread Jingsong Li
Hi RKandoji,

You understand this bug wrong, your code will not go wrong.

The bug is:
TableEnv tEnv = TableEnv.create(...);
Table t1 = tEnv.sqlQuery(...);
tEnv.insertInto("sink1", t1);
tEnv.execute("job1");

Table t2 = tEnv.sqlQuery(...);
tEnv.insertInto("sink2", t2);
tEnv.execute("job2");

This will wrong, job2 will be run contains the operators of job1.

If your job just have one "execute", it is OK.

Best,
Jingsong Lee

On Mon, Jan 27, 2020 at 12:14 AM RKandoji  wrote:

> Hi Jingsong,
>
> Thanks for the information. Not sure if I'm missing anything but I have
> been reusing table env and didn't see anything wrong, I'm worried if I'm
> missed to note anything?
>
> My use case:
> I created a class level StreamTableEnvironment and used it throughout my
> code for creating multiple tables and running multiple SQL queries.
>
> private static StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
> bsTableEnv.registerDataStream(...)
>
> Table latestTbl1 = bsTableEnv.sqlQuery(...)
>
> bsTableEnv.registerDataStream(...)
>
> Table latestTbl2 = bsTableEnv.sqlQuery(...)
>
> and so on..
>
> Could you please let me know if anything specific I need to look at? I would 
> like to understand what was wrong before changing the code.
>
>
> Thanks,
>
> RK
>
>
>
> On Thu, Jan 23, 2020 at 11:48 PM Jingsong Li 
> wrote:
>
>> Hi RKandoji,
>>
>> IMO, yes, you can not reuse table env, you should create a new tEnv after
>> executing, 1.9.1 still has this problem.
>> Related issue is [1], fixed in 1.9.2 and 1.10.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13708
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 24, 2020 at 11:14 AM RKandoji  wrote:
>>
>>> Hi Team,
>>>
>>> I've been using Blink Planner and just came across this page
>>> https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#known-shortcomings-or-limitations-for-new-features
>>>  and
>>> saw below limitation:
>>>
>>> Due to a bug with how transformations are not being cleared on
 execution, TableEnvironment instances should not be reused across
 multiple SQL statements when using the Blink planner.
>>>
>>>
>>> In my code I've created a StreamTableEnvironment (like shown below) and
>>> reusing this instance everywhere for registering data streams, registering
>>> tables and performing multiple SQL queries. So I'm a bit concerned if I
>>> need to change anything? Would above limitation affect 
>>> StreamTableEnvironment
>>> as well?
>>>
>>> private static StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>
>>> Could someone please clarify and provide more details about the
>>> implications.
>>>
>>>
>>> Thanks,
>>> RKandoji
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee


Re: Flink RocksDB logs filling up disk space

2020-01-27 Thread Chesnay Schepler

Please see https://issues.apache.org/jira/browse/FLINK-15068

On 27/01/2020 12:22, Ahmad Hassan wrote:

Hello,

In our production systems, we see that flink rocksdb checkpoint IO 
logs are filling up disk space very very quickly in the order of GB's 
as the logging is very verbose. How do we disable or suppress these 
logs please ? The rocksdb file checkpoint.cc is dumping huge amount of 
checkpoint logs like


Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());


Best Regards,





Flink and Presto integration

2020-01-27 Thread Flavio Pompermaier
Hi all,
is there any integration between Presto and Flink? I'd like to use Presto
for the UI part (preview and so on) while using Flink for the batch
processing. Do you suggest something else otherwise?

Best,
Flavio


Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-27 Thread Mark Harris
Hi Piotr,

Thanks for the link to the issue.

Do you know if there's a workaround? I've tried setting the following in my 
core-site.xml:

​fs.s3a.fast.upload.buffer=true

To try and avoid writing the buffer files, but the taskmanager breaks with the 
same problem.

Best regards,

Mark

From: Piotr Nowojski  on behalf of Piotr Nowojski 

Sent: 22 January 2020 13:29
To: Till Rohrmann 
Cc: Mark Harris ; flink-u...@apache.org 
; kkloudas 
Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for 
S3a files

Hi,

This is probably a known issue of Hadoop [1]. Unfortunately it was only fixed 
in 3.3.0.

Piotrek

[1] https://issues.apache.org/jira/browse/HADOOP-15658

On 22 Jan 2020, at 13:56, Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:

Thanks for reporting this issue Mark. I'm pulling Klou into this conversation 
who knows more about the StreamingFileSink. @Klou does the StreamingFileSink 
relies on DeleteOnExitHooks to clean up files?

Cheers,
Till

On Tue, Jan 21, 2020 at 3:38 PM Mark Harris 
mailto:mark.har...@hivehome.com>> wrote:
Hi,

We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop v 
"Amazon 2.8.5". We've recently noticed that some TaskManagers fail (causing all 
the jobs running on them to fail) with an "java.lang.OutOfMemoryError: GC 
overhead limit exceeded”. The taskmanager (and jobs that should be running on 
it) remain down until manually restarted.

I managed to take and analyze a memory dump from one of the afflicted 
taskmanagers.

It showed that 85% of the heap was made up of the 
java.io.DeleteOnExitHook.files hashset. The majority of the strings in that 
hashset (9041060 out of ~9041100) pointed to files that began 
/tmp/hadoop-yarn/s3a/s3ablock

The problem seems to affect jobs that make use of the StreamingFileSink - all 
of the taskmanager crashes have been on the taskmaster running at least one job 
using this sink, and a cluster running only a single taskmanager / job that 
uses the StreamingFileSink crashed with the GC overhead limit exceeded error.

I've had a look for advice on handling this error more broadly without luck.

Any suggestions or advice gratefully received.

Best regards,

Mark Harris



The information contained in or attached to this email is intended only for the 
use of the individual or entity to which it is addressed. If you are not the 
intended recipient, or a person responsible for delivering it to the intended 
recipient, you are not authorised to and must not disclose, copy, distribute, 
or retain this message or any part of it. It may contain information which is 
confidential and/or covered by legal professional or other privilege under 
applicable law.

The views expressed in this email are not necessarily the views of Centrica plc 
or its subsidiaries, and the company, its directors, officers or employees make 
no representation or accept any liability for its accuracy or completeness 
unless expressly stated to the contrary.

Additional regulatory disclosures may be found here: 
https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email

PH Jones is a trading name of British Gas Social Housing Limited. British Gas 
Social Housing Limited (company no: 01026007), British Gas Trading Limited 
(company no: 03078711), British Gas Services Limited (company no: 3141243), 
British Gas Insurance Limited (company no: 06608316), British Gas New Heating 
Limited (company no: 06723244), British Gas Services (Commercial) Limited 
(company no: 07385984) and Centrica Energy (Trading) Limited (company no: 
02877397) are all wholly owned subsidiaries of Centrica plc (company no: 
3033654). Each company is registered in England and Wales with a registered 
office at Millstream, Maidenhead Road, Windsor, Berkshire SL4 5GD.

British Gas Insurance Limited is authorised by the Prudential Regulation 
Authority and regulated by the Financial Conduct Authority and the Prudential 
Regulation Authority. British Gas Services Limited and Centrica Energy 
(Trading) Limited are authorised and regulated by the Financial Conduct 
Authority. British Gas Trading Limited is an appointed representative of 
British Gas Services Limited which is authorised and regulated by the Financial 
Conduct Authority.



The information contained in or attached to this email is intended only for the 
use of the individual or entity to which it is addressed. If you are not the 
intended recipient, or a person responsible for delivering it to the intended 
recipient, you are not authorised to and must not disclose, copy, distribute, 
or retain this message or any part of it. It may contain information which is 
confidential and/or covered by legal professional or other privilege under 
applicable law.

The views expressed in this email are not necessarily the views of Centrica plc 
or its subsidiaries, and the company, its directors, officers or employees make 
no representation or accept any liability for its accu

Re: Flink RocksDB logs filling up disk space

2020-01-27 Thread Ahmad Hassan
Thanks Chesnay!

On Mon, 27 Jan 2020 at 11:29, Chesnay Schepler  wrote:

> Please see https://issues.apache.org/jira/browse/FLINK-15068
>
> On 27/01/2020 12:22, Ahmad Hassan wrote:
>
> Hello,
>
> In our production systems, we see that flink rocksdb checkpoint IO logs
> are filling up disk space very very quickly in the order of GB's as the
> logging is very verbose. How do we disable or suppress these logs please ?
> The rocksdb file checkpoint.cc is dumping huge amount of checkpoint logs
> like
>
> Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());
>
>
> Best Regards,
>
>
>


Re: Flink and Presto integration

2020-01-27 Thread Itamar Syn-Hershko
Hi Flavio,

Presto contributor and Starburst Partners here.

Presto and Flink are solving completely different challenges. Flink is
about processing data streams as they come in; Presto is about ad-hoc /
periodic querying of data sources.

A typical architecture would use Flink to process data streams and write
data and aggregations to some data stores (Redis, MemSQL, SQLs,
Elasticsearch, etc) and then using Presto to query those data stores (and
possible also others using Query Federation).

What kind of integration will you be looking for?

On Mon, Jan 27, 2020 at 1:44 PM Flavio Pompermaier 
wrote:

> Hi all,
> is there any integration between Presto and Flink? I'd like to use Presto
> for the UI part (preview and so on) while using Flink for the batch
> processing. Do you suggest something else otherwise?
>
> Best,
> Flavio
>


-- 

[image: logo] 
Itamar Syn-Hershko
CTO, Founder
+972-54-2467860
ita...@bigdataboutique.com
https://bigdataboutique.com





Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-27 Thread David Magalhães
Does StreamingFileSink use core-site.xml ? When I was using it, it didn't
load any configurations from core-site.xml.

On Mon, Jan 27, 2020 at 12:08 PM Mark Harris 
wrote:

> Hi Piotr,
>
> Thanks for the link to the issue.
>
> Do you know if there's a workaround? I've tried setting the following in
> my core-site.xml:
>
> ​fs.s3a.fast.upload.buffer=true
>
> To try and avoid writing the buffer files, but the taskmanager breaks with
> the same problem.
>
> Best regards,
>
> Mark
> --
> *From:* Piotr Nowojski  on behalf of Piotr
> Nowojski 
> *Sent:* 22 January 2020 13:29
> *To:* Till Rohrmann 
> *Cc:* Mark Harris ; flink-u...@apache.org <
> flink-u...@apache.org>; kkloudas 
> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
> hooks for S3a files
>
> Hi,
>
> This is probably a known issue of Hadoop [1]. Unfortunately it was only
> fixed in 3.3.0.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/HADOOP-15658
>
> On 22 Jan 2020, at 13:56, Till Rohrmann  wrote:
>
> Thanks for reporting this issue Mark. I'm pulling Klou into this
> conversation who knows more about the StreamingFileSink. @Klou does the
> StreamingFileSink relies on DeleteOnExitHooks to clean up files?
>
> Cheers,
> Till
>
> On Tue, Jan 21, 2020 at 3:38 PM Mark Harris 
> wrote:
>
> Hi,
>
> We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop
> v "Amazon 2.8.5". We've recently noticed that some TaskManagers fail
> (causing all the jobs running on them to fail) with an
> "java.lang.OutOfMemoryError: GC overhead limit exceeded”. The taskmanager
> (and jobs that should be running on it) remain down until manually
> restarted.
>
> I managed to take and analyze a memory dump from one of the afflicted
> taskmanagers.
>
> It showed that 85% of the heap was made up of
> the java.io.DeleteOnExitHook.files hashset. The majority of the strings in
> that hashset (9041060 out of ~9041100) pointed to files that began
> /tmp/hadoop-yarn/s3a/s3ablock
>
> The problem seems to affect jobs that make use of the StreamingFileSink -
> all of the taskmanager crashes have been on the taskmaster running at least
> one job using this sink, and a cluster running only a single taskmanager /
> job that uses the StreamingFileSink crashed with the GC overhead limit
> exceeded error.
>
> I've had a look for advice on handling this error more broadly without
> luck.
>
> Any suggestions or advice gratefully received.
>
> Best regards,
>
> Mark Harris
>
>
>
> The information contained in or attached to this email is intended only
> for the use of the individual or entity to which it is addressed. If you
> are not the intended recipient, or a person responsible for delivering it
> to the intended recipient, you are not authorised to and must not disclose,
> copy, distribute, or retain this message or any part of it. It may contain
> information which is confidential and/or covered by legal professional or
> other privilege under applicable law.
>
> The views expressed in this email are not necessarily the views of
> Centrica plc or its subsidiaries, and the company, its directors, officers
> or employees make no representation or accept any liability for its
> accuracy or completeness unless expressly stated to the contrary.
>
> Additional regulatory disclosures may be found here:
> https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email
>
> PH Jones is a trading name of British Gas Social Housing Limited. British
> Gas Social Housing Limited (company no: 01026007), British Gas Trading
> Limited (company no: 03078711), British Gas Services Limited (company no:
> 3141243), British Gas Insurance Limited (company no: 06608316), British Gas
> New Heating Limited (company no: 06723244), British Gas Services
> (Commercial) Limited (company no: 07385984) and Centrica Energy (Trading)
> Limited (company no: 02877397) are all wholly owned subsidiaries of
> Centrica plc (company no: 3033654). Each company is registered in England
> and Wales with a registered office at Millstream, Maidenhead Road, Windsor,
> Berkshire SL4 5GD.
>
> British Gas Insurance Limited is authorised by the Prudential Regulation
> Authority and regulated by the Financial Conduct Authority and the
> Prudential Regulation Authority. British Gas Services Limited and Centrica
> Energy (Trading) Limited are authorised and regulated by the Financial
> Conduct Authority. British Gas Trading Limited is an appointed
> representative of British Gas Services Limited which is authorised and
> regulated by the Financial Conduct Authority.
>
>
>
>
> The information contained in or attached to this email is intended only
> for the use of the individual or entity to which it is addressed. If you
> are not the intended recipient, or a person responsible for delivering it
> to the intended recipient, you are not authorised to and must not disclose,
> copy, distribute, or retain this message or any part of it. It may contain
> information which is 

Re: Flink and Presto integration

2020-01-27 Thread Flavio Pompermaier
Both Presto and Flink make use of a Catalog in order to be able to
read/write data from a source/sink.
I don't agree about " Flink is about processing data streams" because Flink
is competitive also for the batch workloads (and this will be further
improved in the next releases).
I'd like to register my data sources/sinks in one single catalog (E.g.
Presto) and then being able to reuse it also in Flink (with a simple
translation).
My idea of integration here is thus more at catalog level since I would use
Presto for exploring data from UI and Flink to process it because once the
configuration part has finished (since I have many Flink jobs that I don't
want to throw away or rewrite).

On Mon, Jan 27, 2020 at 2:30 PM Itamar Syn-Hershko <
ita...@bigdataboutique.com> wrote:

> Hi Flavio,
>
> Presto contributor and Starburst Partners here.
>
> Presto and Flink are solving completely different challenges. Flink is
> about processing data streams as they come in; Presto is about ad-hoc /
> periodic querying of data sources.
>
> A typical architecture would use Flink to process data streams and write
> data and aggregations to some data stores (Redis, MemSQL, SQLs,
> Elasticsearch, etc) and then using Presto to query those data stores (and
> possible also others using Query Federation).
>
> What kind of integration will you be looking for?
>
> On Mon, Jan 27, 2020 at 1:44 PM Flavio Pompermaier 
> wrote:
>
>> Hi all,
>> is there any integration between Presto and Flink? I'd like to use Presto
>> for the UI part (preview and so on) while using Flink for the batch
>> processing. Do you suggest something else otherwise?
>>
>> Best,
>> Flavio
>>
>
>
> --
>
> [image: logo] 
> Itamar Syn-Hershko
> CTO, Founder
> +972-54-2467860
> ita...@bigdataboutique.com
> https://bigdataboutique.com
> 
> 
> 
>


Re: Flink and Presto integration

2020-01-27 Thread Itamar Syn-Hershko
Yes, Flink does batch processing by "reevaluating a stream" so to speak.
Presto doesn't have sources and sinks, only catalogs (which are always
allowing reads, and sometimes also writes).

Presto catalogs are a configuration - they are managed on the node
filesystem as a configuration file and nowhere else. Flink sources/sinks
are programmatically configurable and are compiled into your Flink program.
So that is not possible at the moment, and all that's possible to do is get
that info form the API of both products and visualize that. Definitely not
managing them from a single place.

On Mon, Jan 27, 2020 at 3:54 PM Flavio Pompermaier 
wrote:

> Both Presto and Flink make use of a Catalog in order to be able to
> read/write data from a source/sink.
> I don't agree about " Flink is about processing data streams" because
> Flink is competitive also for the batch workloads (and this will be further
> improved in the next releases).
> I'd like to register my data sources/sinks in one single catalog (E.g.
> Presto) and then being able to reuse it also in Flink (with a simple
> translation).
> My idea of integration here is thus more at catalog level since I would
> use Presto for exploring data from UI and Flink to process it because once
> the configuration part has finished (since I have many Flink jobs that I
> don't want to throw away or rewrite).
>
> On Mon, Jan 27, 2020 at 2:30 PM Itamar Syn-Hershko <
> ita...@bigdataboutique.com> wrote:
>
>> Hi Flavio,
>>
>> Presto contributor and Starburst Partners here.
>>
>> Presto and Flink are solving completely different challenges. Flink is
>> about processing data streams as they come in; Presto is about ad-hoc /
>> periodic querying of data sources.
>>
>> A typical architecture would use Flink to process data streams and write
>> data and aggregations to some data stores (Redis, MemSQL, SQLs,
>> Elasticsearch, etc) and then using Presto to query those data stores (and
>> possible also others using Query Federation).
>>
>> What kind of integration will you be looking for?
>>
>> On Mon, Jan 27, 2020 at 1:44 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi all,
>>> is there any integration between Presto and Flink? I'd like to use
>>> Presto for the UI part (preview and so on) while using Flink for the batch
>>> processing. Do you suggest something else otherwise?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>> --
>>
>> [image: logo] 
>> Itamar Syn-Hershko
>> CTO, Founder
>> +972-54-2467860
>> ita...@bigdataboutique.com
>> https://bigdataboutique.com
>> 
>> 
>> 
>>
>
>

-- 

[image: logo] 
Itamar Syn-Hershko
CTO, Founder
+972-54-2467860
ita...@bigdataboutique.com
https://bigdataboutique.com





Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-27 Thread Cliff Resnick
I know from experience that Flink's shaded S3A FileSystem does not
reference core-site.xml, though I don't remember offhand what file (s) it
does reference. However since it's shaded, maybe this could be fixed by
building a Flink FS referencing 3.3.0? Last I checked I think it referenced
3.1.0.

On Mon, Jan 27, 2020, 8:48 AM David Magalhães  wrote:

> Does StreamingFileSink use core-site.xml ? When I was using it, it didn't
> load any configurations from core-site.xml.
>
> On Mon, Jan 27, 2020 at 12:08 PM Mark Harris 
> wrote:
>
>> Hi Piotr,
>>
>> Thanks for the link to the issue.
>>
>> Do you know if there's a workaround? I've tried setting the following in
>> my core-site.xml:
>>
>> ​fs.s3a.fast.upload.buffer=true
>>
>> To try and avoid writing the buffer files, but the taskmanager breaks
>> with the same problem.
>>
>> Best regards,
>>
>> Mark
>> --
>> *From:* Piotr Nowojski  on behalf of Piotr
>> Nowojski 
>> *Sent:* 22 January 2020 13:29
>> *To:* Till Rohrmann 
>> *Cc:* Mark Harris ; flink-u...@apache.org <
>> flink-u...@apache.org>; kkloudas 
>> *Subject:* Re: GC overhead limit exceeded, memory full of DeleteOnExit
>> hooks for S3a files
>>
>> Hi,
>>
>> This is probably a known issue of Hadoop [1]. Unfortunately it was only
>> fixed in 3.3.0.
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/HADOOP-15658
>>
>> On 22 Jan 2020, at 13:56, Till Rohrmann  wrote:
>>
>> Thanks for reporting this issue Mark. I'm pulling Klou into this
>> conversation who knows more about the StreamingFileSink. @Klou does the
>> StreamingFileSink relies on DeleteOnExitHooks to clean up files?
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 21, 2020 at 3:38 PM Mark Harris 
>> wrote:
>>
>> Hi,
>>
>> We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop
>> v "Amazon 2.8.5". We've recently noticed that some TaskManagers fail
>> (causing all the jobs running on them to fail) with an
>> "java.lang.OutOfMemoryError: GC overhead limit exceeded”. The taskmanager
>> (and jobs that should be running on it) remain down until manually
>> restarted.
>>
>> I managed to take and analyze a memory dump from one of the afflicted
>> taskmanagers.
>>
>> It showed that 85% of the heap was made up of
>> the java.io.DeleteOnExitHook.files hashset. The majority of the strings in
>> that hashset (9041060 out of ~9041100) pointed to files that began
>> /tmp/hadoop-yarn/s3a/s3ablock
>>
>> The problem seems to affect jobs that make use of the StreamingFileSink
>> - all of the taskmanager crashes have been on the taskmaster running at
>> least one job using this sink, and a cluster running only a single
>> taskmanager / job that uses the StreamingFileSink crashed with the GC
>> overhead limit exceeded error.
>>
>> I've had a look for advice on handling this error more broadly without
>> luck.
>>
>> Any suggestions or advice gratefully received.
>>
>> Best regards,
>>
>> Mark Harris
>>
>>
>>
>> The information contained in or attached to this email is intended only
>> for the use of the individual or entity to which it is addressed. If you
>> are not the intended recipient, or a person responsible for delivering it
>> to the intended recipient, you are not authorised to and must not disclose,
>> copy, distribute, or retain this message or any part of it. It may contain
>> information which is confidential and/or covered by legal professional or
>> other privilege under applicable law.
>>
>> The views expressed in this email are not necessarily the views of
>> Centrica plc or its subsidiaries, and the company, its directors, officers
>> or employees make no representation or accept any liability for its
>> accuracy or completeness unless expressly stated to the contrary.
>>
>> Additional regulatory disclosures may be found here:
>> https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email
>>
>> PH Jones is a trading name of British Gas Social Housing Limited. British
>> Gas Social Housing Limited (company no: 01026007), British Gas Trading
>> Limited (company no: 03078711), British Gas Services Limited (company no:
>> 3141243), British Gas Insurance Limited (company no: 06608316), British Gas
>> New Heating Limited (company no: 06723244), British Gas Services
>> (Commercial) Limited (company no: 07385984) and Centrica Energy (Trading)
>> Limited (company no: 02877397) are all wholly owned subsidiaries of
>> Centrica plc (company no: 3033654). Each company is registered in England
>> and Wales with a registered office at Millstream, Maidenhead Road, Windsor,
>> Berkshire SL4 5GD.
>>
>> British Gas Insurance Limited is authorised by the Prudential Regulation
>> Authority and regulated by the Financial Conduct Authority and the
>> Prudential Regulation Authority. British Gas Services Limited and Centrica
>> Energy (Trading) Limited are authorised and regulated by the Financial
>> Conduct Authority. British Gas Trading Limited is an appointed
>> representative of British Gas Services Limited which is

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-27 Thread Jark Wu
Hi Kant,
Having a custom state backend is very difficult and is not recommended.

Hi Benoît,
Yes, the "Query on the intermediate state is on the roadmap" I mentioned is
referring to integrate Table API & SQL with Queryable State.
We also have an early issue FLINK-6968 to tracks this.

Best,
Jark


On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hi all!
>
> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
> on the intermediate state is on the roadmap"?
> Are you referring to working on QueryableStateStream/QueryableStateClient
> [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
> there a FLIP?)?
>
> Cheers
> Ben
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>
>
> On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:
>
>> Is it a common practice to have a custom state backend? if so, what would
>> be a popular custom backend?
>>
>> Can I do Elasticseatch as a state backend?
>>
>> Thanks!
>>
>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> 1) List of row is also sufficient in this case. Using a MapState is in
>>> order to retract a row faster, and save the storage size.
>>>
>>> 2) State Process API is usually used to process save point. I’m afraid
>>> the performance is not good to use it for querying.
>>> On the other side, AFAIK, State Process API requires the uid of
>>> operator. However, uid of operators is not set in Table API & SQL.
>>> So I’m not sure whether it works or not.
>>>
>>> 3)You can have a custom statebackend by
>>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>>> via `env.setStateBackend(…)`.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>>>
 Hi Jark,

 1) shouldn't it be a col1 to List of row? multiple rows can have the
 same joining key right?

 2) Can I use state processor API
 
 from an external application to query the intermediate results in near
 real-time? I thought querying rocksdb state is a widely requested feature.
 It would be really great to consider this feature for 1.11

 3) Is there any interface where I can implement my own state backend?

 Thanks!


 On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:

> Hi Kant,
>
> 1) Yes, it will be stored in rocksdb statebackend.
> 2) In old planner, the left state is the same with right state which
> are both `>>`.
> It is a 2-level map structure, where the `col1` is the join key,
> it is the first-level key of the state. The key of the MapState is the
> input row,
> and the `count` is the number of this row, the expiredTime
> indicates when to cleanup this row (avoid infinite state size). You can
> find the source code here[1].
> In blink planner, the state structure will be more complex which
> is determined by the meta-information of upstream. You can see the source
> code of blink planner here [2].
> 3) Currently, the intermediate state is not exposed to users. Usually,
> users should write the query result to an external system (like Mysql) and
> query the external system.
> Query on the intermediate state is on the roadmap, but I guess it
> is not in 1.11 plan.
>
> Best,
> Jark
>
> [1]:
> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
> [2]:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>
>
> 2020年1月21日 18:01,kant kodali  写道:
>
> Hi All,
>
> If I run a query like this
>
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
> table1.col1 = table2.col1")
>
> 1) Where will flink store the intermediate result? Imagine
> flink-conf.yaml says state.backend = 'rocksdb'
>
> 2) If the intermediate results are stored in rockdb then what is the
> key and value in this case(given the query above)?
>
> 3) What is the best way to query these intermediate results from an
> external application? while the job is running and while the job is not
> running?
>
> Thanks!
>
>
>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>


Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks for S3a files

2020-01-27 Thread Piotr Nowojski
Hi,

I think reducing the frequency of the checkpoints and decreasing parallelism of 
the things using the S3AOutputStream class, would help to mitigate the issue. 

I don’t know about other solutions. I would suggest to ask this question 
directly to Steve L. in the bug ticket [1], as he is the one that fixed the 
issue. If there is no workaround, maybe it would be possible to put a pressure 
on the Hadoop guys to back port the fix to older versions?

Piotrek

[1] https://issues.apache.org/jira/browse/HADOOP-15658 


> On 27 Jan 2020, at 15:41, Cliff Resnick  wrote:
> 
> I know from experience that Flink's shaded S3A FileSystem does not reference 
> core-site.xml, though I don't remember offhand what file (s) it does 
> reference. However since it's shaded, maybe this could be fixed by building a 
> Flink FS referencing 3.3.0? Last I checked I think it referenced 3.1.0.
> 
> On Mon, Jan 27, 2020, 8:48 AM David Magalhães  > wrote:
> Does StreamingFileSink use core-site.xml ? When I was using it, it didn't 
> load any configurations from core-site.xml.
> 
> On Mon, Jan 27, 2020 at 12:08 PM Mark Harris  > wrote:
> Hi Piotr,
> 
> Thanks for the link to the issue.
> 
> Do you know if there's a workaround? I've tried setting the following in my 
> core-site.xml:
> 
> ​fs.s3a.fast.upload.buffer=true
> 
> To try and avoid writing the buffer files, but the taskmanager breaks with 
> the same problem.
> 
> Best regards,
> 
> Mark
> From: Piotr Nowojski  > on behalf of Piotr Nowojski 
> mailto:pi...@ververica.com>>
> Sent: 22 January 2020 13:29
> To: Till Rohrmann mailto:trohrm...@apache.org>>
> Cc: Mark Harris mailto:mark.har...@hivehome.com>>; 
> flink-u...@apache.org   >; kkloudas  >
> Subject: Re: GC overhead limit exceeded, memory full of DeleteOnExit hooks 
> for S3a files
>  
> Hi,
> 
> This is probably a known issue of Hadoop [1]. Unfortunately it was only fixed 
> in 3.3.0.
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/HADOOP-15658 
> 
> 
>> On 22 Jan 2020, at 13:56, Till Rohrmann > > wrote:
>> 
>> Thanks for reporting this issue Mark. I'm pulling Klou into this 
>> conversation who knows more about the StreamingFileSink. @Klou does the 
>> StreamingFileSink relies on DeleteOnExitHooks to clean up files?
>> 
>> Cheers,
>> Till
>> 
>> On Tue, Jan 21, 2020 at 3:38 PM Mark Harris > > wrote:
>> Hi,
>> 
>> We're using flink 1.7.2 on an EMR cluster v emr-5.22.0, which runs hadoop v 
>> "Amazon 2.8.5". We've recently noticed that some TaskManagers fail (causing 
>> all the jobs running on them to fail) with an "java.lang.OutOfMemoryError: 
>> GC overhead limit exceeded”. The taskmanager (and jobs that should be 
>> running on it) remain down until manually restarted.
>> 
>> I managed to take and analyze a memory dump from one of the afflicted 
>> taskmanagers. 
>> 
>> It showed that 85% of the heap was made up of the 
>> java.io.DeleteOnExitHook.files hashset. The majority of the strings in that 
>> hashset (9041060 out of ~9041100) pointed to files that began 
>> /tmp/hadoop-yarn/s3a/s3ablock
>> 
>> The problem seems to affect jobs that make use of the StreamingFileSink - 
>> all of the taskmanager crashes have been on the taskmaster running at least 
>> one job using this sink, and a cluster running only a single taskmanager / 
>> job that uses the StreamingFileSink crashed with the GC overhead limit 
>> exceeded error.
>> 
>> I've had a look for advice on handling this error more broadly without luck.
>> 
>> Any suggestions or advice gratefully received.
>> 
>> Best regards,
>> 
>> Mark Harris
>> 
>> 
>> 
>> The information contained in or attached to this email is intended only for 
>> the use of the individual or entity to which it is addressed. If you are not 
>> the intended recipient, or a person responsible for delivering it to the 
>> intended recipient, you are not authorised to and must not disclose, copy, 
>> distribute, or retain this message or any part of it. It may contain 
>> information which is confidential and/or covered by legal professional or 
>> other privilege under applicable law. 
>> 
>> The views expressed in this email are not necessarily the views of Centrica 
>> plc or its subsidiaries, and the company, its directors, officers or 
>> employees make no representation or accept any liability for its accuracy or 
>> completeness unless expressly stated to the contrary. 
>> 
>> Additional regulatory disclosures may be found here: 
>> https://www.centrica.com/privacy-cookies-and-legal-disclaimer#email 
>>  
>> 
>> PH Jones is a trading name of British Gas Social H

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-27 Thread Benoît Paris
Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines changed.
Thanks for the details, Jark!

On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:

> Hi Kant,
> Having a custom state backend is very difficult and is not recommended.
>
> Hi Benoît,
> Yes, the "Query on the intermediate state is on the roadmap" I
> mentioned is referring to integrate Table API & SQL with Queryable State.
> We also have an early issue FLINK-6968 to tracks this.
>
> Best,
> Jark
>
>
> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hi all!
>>
>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>> on the intermediate state is on the roadmap"?
>> Are you referring to working on QueryableStateStream/QueryableStateClient
>> [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
>> there a FLIP?)?
>>
>> Cheers
>> Ben
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>
>>
>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:
>>
>>> Is it a common practice to have a custom state backend? if so, what
>>> would be a popular custom backend?
>>>
>>> Can I do Elasticseatch as a state backend?
>>>
>>> Thanks!
>>>
>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>>>
 Hi Kant,

 1) List of row is also sufficient in this case. Using a MapState is in
 order to retract a row faster, and save the storage size.

 2) State Process API is usually used to process save point. I’m afraid
 the performance is not good to use it for querying.
 On the other side, AFAIK, State Process API requires the uid of
 operator. However, uid of operators is not set in Table API & SQL.
 So I’m not sure whether it works or not.

 3)You can have a custom statebackend by
 implement org.apache.flink.runtime.state.StateBackend interface, and use it
 via `env.setStateBackend(…)`.

 Best,
 Jark

 On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:

> Hi Jark,
>
> 1) shouldn't it be a col1 to List of row? multiple rows can have the
> same joining key right?
>
> 2) Can I use state processor API
> 
> from an external application to query the intermediate results in near
> real-time? I thought querying rocksdb state is a widely requested feature.
> It would be really great to consider this feature for 1.11
>
> 3) Is there any interface where I can implement my own state backend?
>
> Thanks!
>
>
> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> 1) Yes, it will be stored in rocksdb statebackend.
>> 2) In old planner, the left state is the same with right state which
>> are both `>>`.
>> It is a 2-level map structure, where the `col1` is the join key,
>> it is the first-level key of the state. The key of the MapState is the
>> input row,
>> and the `count` is the number of this row, the expiredTime
>> indicates when to cleanup this row (avoid infinite state size). You can
>> find the source code here[1].
>> In blink planner, the state structure will be more complex which
>> is determined by the meta-information of upstream. You can see the source
>> code of blink planner here [2].
>> 3) Currently, the intermediate state is not exposed to users.
>> Usually, users should write the query result to an external system (like
>> Mysql) and query the external system.
>> Query on the intermediate state is on the roadmap, but I guess it
>> is not in 1.11 plan.
>>
>> Best,
>> Jark
>>
>> [1]:
>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>> [2]:
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>
>>
>> 2020年1月21日 18:01,kant kodali  写道:
>>
>> Hi All,
>>
>> If I run a query like this
>>
>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>> table1.col1 = table2.col1")
>>
>> 1) Where will flink store the intermediate result? Imagine
>> flink-conf.yaml says state.backend = 'rocksdb'
>>
>> 2) If the intermediate results are stored in rockdb then what is the
>> key and value in this case(given the query above)?
>>
>> 3) What is the best way to query these intermediate results from an
>> external application? while the job is running and while the job is not
>> running?
>>
>> Thanks!
>>

Re: [State Processor API] how to convert savepoint back to broadcast state

2020-01-27 Thread Jin Yi
Hi Yun,

Thanks for the suggestion!

Best
Eleanore

On Mon, Jan 27, 2020 at 1:54 AM Yun Tang  wrote:

> Hi Yi
>
> Glad to know you have already resolved it. State process API would use
> data stream API instead of data set API in the future [1].
>
> Besides, you could also follow the guide in "the brodcast state
> pattern"[2]
>
> // a map descriptor to store the name of the rule (string) and the rule 
> itself.MapStateDescriptor stateDescriptor = new 
> MapStateDescriptor<>(
>   "*RulesBroadcastState*",
>   BasicTypeInfo.STRING_TYPE_INFO,
>   TypeInformation.of(new TypeHint() {}));
>   // broadcast the rules and create the broadcast 
> stateBroadcastStream broadcastStream = ruleStream
> .broadcast(stateDescriptor);
>
> colorPartitionedStream
>  .connect(broadcastStream)
>  .process(
>
>  new KeyedBroadcastProcessFunction String>() {
>  // my matching logic
>  }
>  ).uid("*your-uid*");
>
> Make sure the uid and the state-name are the same with those in your
> savepoint, the CoBroadcastWithKeyedOperator would initialize the broadcast
> state when opening. [3]
>
>
> [1]
> https://flink.apache.org/feature/2019/09/13/state-processor-api.html#why-dataset-api
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
> [3]
> https://github.com/apache/flink/blob/53f956fb57dd5601d2e3ca9289207d21796cdc4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101
>
>
> Best
> Yun Tang
>
> --
> *From:* Jin Yi 
> *Sent:* Monday, January 27, 2020 14:50
> *To:* Yun Tang 
> *Cc:* user ; user...@flink.apache.org <
> user...@flink.apache.org>
> *Subject:* Re: [State Processor API] how to convert savepoint back to
> broadcast state
>
> Hi Yun,
>
> After search around in the documentation, I tried extends
> BroadcastProcessFunction implements CheckpointedFunction. And I have
> initialized broadcast state in public void 
> initializeState(FunctionInitializationContext
> context) method, it seems working fine.
>
> Here is the doc I followed:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#checkpointedfunction
>
> Thanks a lot for your help!
> Eleanore
>
> On Sun, Jan 26, 2020 at 6:53 PM Jin Yi  wrote:
>
> Hi Yun,
>
> Thanks for the response, I have checked official document, and I have
> referred this example to write the broadcast state to a savepoint.
>
> My question is: I can use state processor api to read back the savepoint
> into a dataSet, but how can I use the dataSet as the initial value for the
> broadcast state in the BroadcastProcessFunction.
>
> Thanks a lot!
>
> Eleanore
>
> On Sun, Jan 26, 2020 at 8:53 AM Yun Tang  wrote:
>
> Hi Yi
>
> Can the official doc of writing broad cast state [1] satisfies your
> request?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1
>
> Best
> Yun Tang
> --
> *From:* Jin Yi 
> *Sent:* Thursday, January 23, 2020 8:12
> *To:* user ; user...@flink.apache.org <
> user...@flink.apache.org>
> *Subject:* [State Processor API] how to convert savepoint back to
> broadcast state
>
> Hi there,
>
> I would like to read the savepoints (for broadcast state) back into the
> broadcast state, how should I do it?
>
> // load the existingSavepoint;
> ExistingSavepoint existingSavepoint = Savepoint.load(environment, 
> "file:///tmp/new_savepoints", new MemoryStateBackend());
>
> // read state from existing savepoint
> dataSet = existingSavepoint.readBroadcastState(OPERATOR_UID, 
> "largeKeySetStateDescription", BasicTypeInfo.STRING_TYPE_INFO, 
> BasicTypeInfo.STRING_TYPE_INFO);
>
> // TODO in BoradcastProcessFunction, how can I put the savepoint dataset back 
> into BroadcastState?
>
> Thanks!
>
> Eleanore
>
>


Re: BlinkPlanner limitation related clarification

2020-01-27 Thread RKandoji
Hi Jingsong,

Thanks for the clarification!
The limitation description is a bit confusing to me but it was clear after
seeing the above example posted by you.

Regards,
RK.



On Mon, Jan 27, 2020 at 6:25 AM Jingsong Li  wrote:

> Hi RKandoji,
>
> You understand this bug wrong, your code will not go wrong.
>
> The bug is:
> TableEnv tEnv = TableEnv.create(...);
> Table t1 = tEnv.sqlQuery(...);
> tEnv.insertInto("sink1", t1);
> tEnv.execute("job1");
>
> Table t2 = tEnv.sqlQuery(...);
> tEnv.insertInto("sink2", t2);
> tEnv.execute("job2");
>
> This will wrong, job2 will be run contains the operators of job1.
>
> If your job just have one "execute", it is OK.
>
> Best,
> Jingsong Lee
>
> On Mon, Jan 27, 2020 at 12:14 AM RKandoji  wrote:
>
>> Hi Jingsong,
>>
>> Thanks for the information. Not sure if I'm missing anything but I have
>> been reusing table env and didn't see anything wrong, I'm worried if I'm
>> missed to note anything?
>>
>> My use case:
>> I created a class level StreamTableEnvironment and used it throughout my
>> code for creating multiple tables and running multiple SQL queries.
>>
>> private static StreamTableEnvironment bsTableEnv = 
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> bsTableEnv.registerDataStream(...)
>>
>> Table latestTbl1 = bsTableEnv.sqlQuery(...)
>>
>> bsTableEnv.registerDataStream(...)
>>
>> Table latestTbl2 = bsTableEnv.sqlQuery(...)
>>
>> and so on..
>>
>> Could you please let me know if anything specific I need to look at? I would 
>> like to understand what was wrong before changing the code.
>>
>>
>> Thanks,
>>
>> RK
>>
>>
>>
>> On Thu, Jan 23, 2020 at 11:48 PM Jingsong Li 
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> IMO, yes, you can not reuse table env, you should create a new tEnv
>>> after executing, 1.9.1 still has this problem.
>>> Related issue is [1], fixed in 1.9.2 and 1.10.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-13708
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Jan 24, 2020 at 11:14 AM RKandoji  wrote:
>>>
 Hi Team,

 I've been using Blink Planner and just came across this page
 https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#known-shortcomings-or-limitations-for-new-features
  and
 saw below limitation:

 Due to a bug with how transformations are not being cleared on
> execution, TableEnvironment instances should not be reused across
> multiple SQL statements when using the Blink planner.


 In my code I've created a StreamTableEnvironment (like shown below) and
 reusing this instance everywhere for registering data streams, registering
 tables and performing multiple SQL queries. So I'm a bit concerned if I
 need to change anything? Would above limitation affect 
 StreamTableEnvironment
 as well?

 private static StreamTableEnvironment bsTableEnv =
 StreamTableEnvironment.create(bsEnv, bsSettings);

 Could someone please clarify and provide more details about the
 implications.


 Thanks,
 RKandoji

>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: batch job OOM

2020-01-27 Thread Fanbin Bu
I can build flink 1.10 and install it on to EMR
(flink-dist_2.11-1.10.0.jar). but what about other dependencies in my
project build.gradle, ie. flink-scala_2.11, flink-json, flink-jdbc... do I
continue to use 1.9.0 since there is no 1.10 available?

Thanks,
Fanbin

On Fri, Jan 24, 2020 at 11:39 PM Bowen Li  wrote:

> Hi Fanbin,
>
> You can install your own Flink build in AWS EMR, and it frees you from
> Emr’s release cycles
>
> On Thu, Jan 23, 2020 at 03:36 Jingsong Li  wrote:
>
>> Fanbin,
>>
>> I have no idea now, can you created a JIRA to track it? You can describe
>> complete SQL and some data informations.
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 4:13 PM Fanbin Bu  wrote:
>>
>>> Jingsong,
>>>
>>> Do you have any suggestions to debug the above mentioned
>>> IndexOutOfBoundsException error?
>>> Thanks,
>>>
>>> Fanbin
>>>
>>> On Wed, Jan 22, 2020 at 11:07 PM Fanbin Bu 
>>> wrote:
>>>
 I got the following error when running another job. any suggestions?

 Caused by: java.lang.IndexOutOfBoundsException
 at
 org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701)
 at
 org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264)
 at HashWinAggWithKeys$538.endInput(Unknown Source)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
 at
 org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
 at
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
 at java.lang.Thread.run(Thread.java:748)

 On Wed, Jan 22, 2020 at 8:57 PM Fanbin Bu 
 wrote:

> Jingsong,
>
> I set the config value to be too large. After I changed it to a
> smaller number it works now!
> thanks you for the help. really appreciate it!
>
> Fanbin
>
> On Wed, Jan 22, 2020 at 8:50 PM Jingsong Li 
> wrote:
>
>> Fanbin,
>>
>> Looks like your config is wrong, can you show your config code?
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 23, 2020 at 12:41 PM Fanbin Bu 
>> wrote:
>>
>>> Jingsong,
>>>
>>> Great, now i got a different error:
>>>
>>> java.lang.NullPointerException: Initial Segment may not be null
>>> at 
>>> org.apache.flink.runtime.memory.AbstractPagedOutputView.(AbstractPagedOutputView.java:65)
>>> at 
>>> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.(SimpleCollectingOutputView.java:49)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.(BytesHashMap.java:522)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:190)
>>> at 
>>> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.(BytesHashMap.java:149)
>>> at LocalHashWinAggWithKeys$292.open(Unknown Source)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>>> at 
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>> is there any other config i should add?
>>>
>>> thanks,
>>>
>>> Fanbin
>>>
>>>
>>> On Wed, Jan 22, 2020 at 8:07 PM Fanbin Bu 
>>> wrote:
>>>
 you beat me to it.
 let's me try that.

 On Wed, Jan 22, 2020 at 7:57 PM Jingsong Li 
 wrote:

> Fanbin,
>
> Document is here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html
> NOTE: you need configure this into TableConfig.
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 23, 2020 at 11:54 AM Fanbin Bu 
> wrote:
>
>> Jingsong,
>>
>> Thank you for the response.
>> Since I'm using flink on EMR and the latest version is 1.9 now.
>> the second option is ruled out. but will keep that in mind for future
>> upgrade.
>>
>> I'm going to try the first option. It's probably a good idea to
>> add that in the doc for example:
>

Re: is Flink supporting pre-loading of a compacted (reference) topic for a join ?

2020-01-27 Thread Tzu-Li Tai
Hi Dominique,

FLIP-17 (Side Inputs) is not yet implemented, AFAIK.

One possible way to overcome this right now if your reference data is static
and not continuously changing, is to use the State Processor API to
bootstrap a savepoint with the reference data.
Have you looked into that and see if it would work for you?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink and Presto integration

2020-01-27 Thread Jingsong Li
Hi Flavio,

Your requirement should be to use blink batch to read the tables in Presto?
I'm not familiar with Presto's catalog. Is it like hive Metastore?

If so, what needs to be done is similar to the hive connector.
You need to implement a catalog of presto, which translates the Presto
table into a Flink table. You may need to deal with partitions, statistics,
and so on.

Best,
Jingsong Lee

On Mon, Jan 27, 2020 at 9:58 PM Itamar Syn-Hershko <
ita...@bigdataboutique.com> wrote:

> Yes, Flink does batch processing by "reevaluating a stream" so to speak.
> Presto doesn't have sources and sinks, only catalogs (which are always
> allowing reads, and sometimes also writes).
>
> Presto catalogs are a configuration - they are managed on the node
> filesystem as a configuration file and nowhere else. Flink sources/sinks
> are programmatically configurable and are compiled into your Flink program.
> So that is not possible at the moment, and all that's possible to do is get
> that info form the API of both products and visualize that. Definitely not
> managing them from a single place.
>
> On Mon, Jan 27, 2020 at 3:54 PM Flavio Pompermaier 
> wrote:
>
>> Both Presto and Flink make use of a Catalog in order to be able to
>> read/write data from a source/sink.
>> I don't agree about " Flink is about processing data streams" because
>> Flink is competitive also for the batch workloads (and this will be further
>> improved in the next releases).
>> I'd like to register my data sources/sinks in one single catalog (E.g.
>> Presto) and then being able to reuse it also in Flink (with a simple
>> translation).
>> My idea of integration here is thus more at catalog level since I would
>> use Presto for exploring data from UI and Flink to process it because once
>> the configuration part has finished (since I have many Flink jobs that I
>> don't want to throw away or rewrite).
>>
>> On Mon, Jan 27, 2020 at 2:30 PM Itamar Syn-Hershko <
>> ita...@bigdataboutique.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> Presto contributor and Starburst Partners here.
>>>
>>> Presto and Flink are solving completely different challenges. Flink is
>>> about processing data streams as they come in; Presto is about ad-hoc /
>>> periodic querying of data sources.
>>>
>>> A typical architecture would use Flink to process data streams and write
>>> data and aggregations to some data stores (Redis, MemSQL, SQLs,
>>> Elasticsearch, etc) and then using Presto to query those data stores (and
>>> possible also others using Query Federation).
>>>
>>> What kind of integration will you be looking for?
>>>
>>> On Mon, Jan 27, 2020 at 1:44 PM Flavio Pompermaier 
>>> wrote:
>>>
 Hi all,
 is there any integration between Presto and Flink? I'd like to use
 Presto for the UI part (preview and so on) while using Flink for the batch
 processing. Do you suggest something else otherwise?

 Best,
 Flavio

>>>
>>>
>>> --
>>>
>>> [image: logo] 
>>> Itamar Syn-Hershko
>>> CTO, Founder
>>> +972-54-2467860
>>> ita...@bigdataboutique.com
>>> https://bigdataboutique.com
>>> 
>>> 
>>> 
>>>
>>
>>
>
> --
>
> [image: logo] 
> Itamar Syn-Hershko
> CTO, Founder
> +972-54-2467860
> ita...@bigdataboutique.com
> https://bigdataboutique.com
> 
> 
> 
>


-- 
Best, Jingsong Lee


Re: Is there anything strictly special about sink functions?

2020-01-27 Thread Konstantin Knauf
Hi Andrew,

as far as I know there is nothing particularly special about the sink in
terms of how it handles state or time. You can not leave the pipeline
"unfinished", only sinks trigger the execution of the whole pipeline.

Cheers,

Konstantin



On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts  wrote:

> Hello,
>
> I’m trying to push some behavior that we’ve currently got in a large,
> stateful SinkFunction implementation into Flink’s windowing system. The
> task at hand is similar to what StreamingFileSink provides, but more
> flexible. I don’t want to re-implement that sink, because it uses the
> StreamingRuntimeContext.getProcessingTimeService() via a cast - that class
> is marked as internal, and I’d like to avoid the exposure to an interface
> that could change. Extending it similarly introduces complexity I would
> rather not add to our codebase.
>
> WindowedStream.process() provides more or less the pieces I need, but the
> stream continues on after a ProcessFunction - there’s no way to process()
> directly into a sink. I could use a ProcessFunction[In, Unit, Key, Window],
> and follow that immediately with a no-op sink that discards the Unit
> values, or I could just leave the stream “unfinished," with no sink.
>
> Is there a downside to either of these approaches? Is there anything
> special about doing sink-like work in a ProcessFunction or FlatMapFunction
> instead of a SinkFunction?
>
> Thanks,
>
> Andrew
>
>
>
> --
> *Confidentiality Notice: The information contained in this e-mail and any
>
> attachments may be confidential. If you are not an intended recipient, you
>
> are hereby notified that any dissemination, distribution or copying of this
>
> e-mail is strictly prohibited. If you have received this e-mail in error,
>
> please notify the sender and permanently delete the e-mail and any
>
> attachments immediately. You should not retain, copy or use this e-mail or
>
> any attachment for any purpose, nor disclose all or any part of the
>
> contents to any other person. Thank you.*
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData Ververica 


--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Re: Blocking KeyedCoProcessFunction.processElement1

2020-01-27 Thread Alexey Trenikhun
Thank you Yun Tang.
My implementation potentially could block for significant amount of time, 
because I wanted to do RDBMS maintenance (create partitions for new data, purge 
old data etc) in-line with writing stream data to a database


From: Yun Tang 
Sent: Sunday, January 26, 2020 8:42:37 AM
To: Alexey Trenikhun ; user@flink.apache.org 

Subject: Re: Blocking KeyedCoProcessFunction.processElement1

Hi Alexey

Actually, I don't understand why you thing 
KeyedCoProcessFunction#processElement1 would block for significant amount of 
time, it just process record from the elements in the first input stream which 
is necessary. If you really find it would block for a long time, I think that's 
because your processing logic has some problem to stuck. On the other hand, 
since processing checkpoint and records hold the same lock, we cannot process 
checkpoint when the record processing logic did not release the lock.

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Thursday, January 23, 2020 13:04
To: user@flink.apache.org 
Subject: Blocking KeyedCoProcessFunction.processElement1


Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of 
time, will it prevent checkpoint ?

Thanks,
Alexey


Re: Flink RocksDB logs filling up disk space

2020-01-27 Thread Yun Tang
Hi Ahmad

Apart from setting the logger level of RocksDB, I also wonder why you would 
meet rocksdb checkpoint IO logs were filling up disk space very very quickly. 
How larger the local checkpoint state is and how long the checkpoint interval 
is? I think you might give a too short interval of checkpoint, even you could 
avoid to record too many logs, and I don't think current checkpoint 
configuration is appropriate.

Best
Yun Tang

From: Ahmad Hassan 
Sent: Monday, January 27, 2020 20:22
To: user 
Subject: Re: Flink RocksDB logs filling up disk space


Thanks Chesnay!

On Mon, 27 Jan 2020 at 11:29, Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
Please see https://issues.apache.org/jira/browse/FLINK-15068

On 27/01/2020 12:22, Ahmad Hassan wrote:
Hello,

In our production systems, we see that flink rocksdb checkpoint IO logs are 
filling up disk space very very quickly in the order of GB's as the logging is 
very verbose. How do we disable or suppress these logs please ? The rocksdb 
file checkpoint.cc is dumping huge amount of checkpoint logs like

Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());


Best Regards,




Re: Blocking KeyedCoProcessFunction.processElement1

2020-01-27 Thread Yun Tang
Hi Alexey

If possible, I think you could move some RDBMS maintenance operations to the 
#open method of RichFunction to reduce the possibility of blocking processing 
records.

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Tuesday, January 28, 2020 15:15
To: Yun Tang ; user@flink.apache.org 
Subject: Re: Blocking KeyedCoProcessFunction.processElement1

Thank you Yun Tang.
My implementation potentially could block for significant amount of time, 
because I wanted to do RDBMS maintenance (create partitions for new data, purge 
old data etc) in-line with writing stream data to a database


From: Yun Tang 
Sent: Sunday, January 26, 2020 8:42:37 AM
To: Alexey Trenikhun ; user@flink.apache.org 

Subject: Re: Blocking KeyedCoProcessFunction.processElement1

Hi Alexey

Actually, I don't understand why you thing 
KeyedCoProcessFunction#processElement1 would block for significant amount of 
time, it just process record from the elements in the first input stream which 
is necessary. If you really find it would block for a long time, I think that's 
because your processing logic has some problem to stuck. On the other hand, 
since processing checkpoint and records hold the same lock, we cannot process 
checkpoint when the record processing logic did not release the lock.

Best
Yun Tang

From: Alexey Trenikhun 
Sent: Thursday, January 23, 2020 13:04
To: user@flink.apache.org 
Subject: Blocking KeyedCoProcessFunction.processElement1


Hello,
If KeyedCoProcessFunction.processElement1 blocks for significant amount of 
time, will it prevent checkpoint ?

Thanks,
Alexey


Re: debug flink in intelliJ on EMR

2020-01-27 Thread Arvid Heise
Hi Fanbin,

the host should be the external IP of your master node. However, usually
the port 5005 is not open in EMR (you could any other open, non-used port).

Alternatively, you could use a SSH port forwarding [1]:

ssh -L ::5005

And then connect to localhost: in your IDE.

[1] https://www.ssh.com/ssh/tunneling/example

On Thu, Jan 23, 2020 at 9:24 AM Fanbin Bu  wrote:

> Hi,
>
> I m following
> https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters
>  to
> debug flink program running on EMR.
>
> how do I specify the host in the `edit configurations` if the terminal on
> emr master is
> hadoop@ip-10-200-46-186
> ?
>
> Thanks,
> Fanbin
>