Re: Flink CLI properties with HA

2018-07-17 Thread Sampath Bhat
Hi vino

Should the flink CLI have access to the path mentioned in
*high-availability.storageDir*?
If my flink cluster is on set of machines and i submit my job from flink
CLI from another independent machine by giving necessary details will the
CLI try to access *high-availability.storageDir *path?

I'm aware of the fact that flink client will connect to zookeeper to get
leader address and necessary information for job submission but my
confusion is with *high-availability.storageDir* and its necessity in flink
CLI configuration.

On Mon, Jul 16, 2018 at 2:44 PM, vino yang  wrote:

> Hi Sampath,
>
> Flink CLI need to retrieve the JobManager leader address, so it need  to
> access the HA specific configuration. Because if based on Zookeeper to
> implement the HA, the leader address information will fetch from Zookeeper.
>
> The main use of config item *high-availability.storageDir* is storage
> (Job graph, checkpoint and so on). Actually, the real data is stored under
> this path which used to recover purpose, zookeeper just store a state
> handle.
>
> ---
> Thanks.
> vino.
>
>
> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>
>>
>> -- Forwarded message --
>> From: Sampath Bhat 
>> Date: Fri, Jul 13, 2018 at 3:18 PM
>> Subject: Flink CLI properties with HA
>> To: user 
>>
>>
>> Hello
>>
>> When HA is enabled in the flink cluster and if I've to submit job via
>> flink CLI then in the flink-conf.yaml of flink CLI should contain this
>> properties -
>> high-availability: zookeeper
>> high-availability.cluster-id: flink
>> high-availability.zookeeper.path.root: flink
>> high-availability.storageDir: 
>> high-availability.zookeeper.quorum: 
>>
>> What is the need of high-availability.storageDir for flink CLI. Does this
>> mean that even flink client should be able to access the mentioned path or
>> is it some check being done on the property name?
>>
>> Without these properties flink cli will not be able to submit job to
>> flink cluster when HA is enabled.
>>
>>
>


[jira] [Created] (FLINK-9870) Support field mapping and time attributes for table sinks

2018-07-17 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9870:
---

 Summary: Support field mapping and time attributes for table sinks
 Key: FLINK-9870
 URL: https://issues.apache.org/jira/browse/FLINK-9870
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


FLINK-7548 reworked the table source design and implemented the interfaces 
{{DefinedFieldMapping}}, {{DefinedProctimeAttribute}}, and 
{{DefinedRowtimeAttributes}}.

However, these interfaces need to be implemented by table sinks as well in 
order to map a table back into a sink similar how source do it for reading 
input data.

The current unified sink design assumes that this is possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9871) Use Description class for ConfigOptions with rich formatting

2018-07-17 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-9871:
---

 Summary: Use Description class for ConfigOptions with rich 
formatting
 Key: FLINK-9871
 URL: https://issues.apache.org/jira/browse/FLINK-9871
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.6.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.6.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Flink WindowedStream - Need assistance

2018-07-17 Thread Titus Rakkesh
Friends, any assistance regarding this?


On Mon, Jul 16, 2018 at 3:44 PM, Titus Rakkesh 
wrote:

> Dear All,
>
> We have 2 independent streams which will receive elements in different
> frequency,
>
> DataStream> splittedActivationTuple;
>
> DataStream> unionReloadsStream;
>
> We have a requirement to keep "splittedActivationTuple" stream elements in
> a Window of eviction time period of 24 hours. So I created a
> "WindowedStream" like below,
>
> WindowedStream, Tuple, GlobalWindow> 
> keyedWindowedActStream = splittedActivationTuple
> .assignTimestampsAndWatermarks(new 
> IngestionTimeExtractor()).keyBy(0).window(GlobalWindows.create())
> .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)));
>
> Our requirements are following,
>
>1.
>
>When "unionReloadsStream" receives data, we need to check whether the
>corresponding "String" field matches with the "String" field in the
>WindowedStream and accumulate "WindowedStream's" Double with
>"unionReloadsStream" Double.Will this possible with Flink? I checked
>CoGroup and CoMap. But I couldn't figure out how to do since I am new.
>2.
>
>CEP functionality to create a new Stream of from WindowedStream if the
>Double value > 100? I went through several flink's CEP tutorials. But
>couldn't able to figure out how to do with "WindowedStream"?
>
> I am very new to flink. Any assistance would be highly appreciated.
>
> Thanks,
>
> Titus
>


[jira] [Created] (FLINK-9872) SavepointITCase#testSavepointForJobWithIteration does not properly cancell jobs

2018-07-17 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9872:
---

 Summary: SavepointITCase#testSavepointForJobWithIteration does not 
properly cancell jobs
 Key: FLINK-9872
 URL: https://issues.apache.org/jira/browse/FLINK-9872
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.5.1, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.5.2, 1.6.0


The {{SavepointITCase}} attempts to cancel a job by calling {{cancel}} on a 
source instance. However this instance isn't actually executed on the cluster, 
since it is serialized during the submission process.

 

Additionally we aren't waiting for the cancellation to finish, causing the test 
to log several exceptions when the cluster is shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9873) Log actual state when aborting checkpoint due to task not running

2018-07-17 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9873:
---

 Summary: Log actual state when aborting checkpoint due to task not 
running
 Key: FLINK-9873
 URL: https://issues.apache.org/jira/browse/FLINK-9873
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Affects Versions: 1.5.1, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.5.2, 1.6.0


Currently, if a checkpoint is triggered while a task s not in a RUNNING state 
the following message is logged:
{code:java}
Checkpoint triggering task {} of job {} is not being executed at the 
moment.{code}
We can improve this message to include the actual task state to help diagnose 
problems.

This message is also a bit ambiguous, as "being executed" could mean many 
things, from not "RUNNING", to not being "DEPLOYED", or to not existing at all.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Flink CLI properties with HA

2018-07-17 Thread vino yang
Hi Sampath,

It seems Flink CLI for standalone would not access
*high-availability.storageDir.*

What's the exception stack trace in your environment?

Thanks, vino.

2018-07-17 15:08 GMT+08:00 Sampath Bhat :

> Hi vino
>
> Should the flink CLI have access to the path mentioned in
> *high-availability.storageDir*?
> If my flink cluster is on set of machines and i submit my job from flink
> CLI from another independent machine by giving necessary details will the
> CLI try to access *high-availability.storageDir *path?
>
> I'm aware of the fact that flink client will connect to zookeeper to get
> leader address and necessary information for job submission but my
> confusion is with *high-availability.storageDir* and its necessity in
> flink CLI configuration.
>
> On Mon, Jul 16, 2018 at 2:44 PM, vino yang  wrote:
>
>> Hi Sampath,
>>
>> Flink CLI need to retrieve the JobManager leader address, so it need  to
>> access the HA specific configuration. Because if based on Zookeeper to
>> implement the HA, the leader address information will fetch from Zookeeper.
>>
>> The main use of config item *high-availability.storageDir* is storage
>> (Job graph, checkpoint and so on). Actually, the real data is stored under
>> this path which used to recover purpose, zookeeper just store a state
>> handle.
>>
>> ---
>> Thanks.
>> vino.
>>
>>
>> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>>
>>>
>>> -- Forwarded message --
>>> From: Sampath Bhat 
>>> Date: Fri, Jul 13, 2018 at 3:18 PM
>>> Subject: Flink CLI properties with HA
>>> To: user 
>>>
>>>
>>> Hello
>>>
>>> When HA is enabled in the flink cluster and if I've to submit job via
>>> flink CLI then in the flink-conf.yaml of flink CLI should contain this
>>> properties -
>>> high-availability: zookeeper
>>> high-availability.cluster-id: flink
>>> high-availability.zookeeper.path.root: flink
>>> high-availability.storageDir: 
>>> high-availability.zookeeper.quorum: 
>>>
>>> What is the need of high-availability.storageDir for flink CLI. Does
>>> this mean that even flink client should be able to access the mentioned
>>> path or is it some check being done on the property name?
>>>
>>> Without these properties flink cli will not be able to submit job to
>>> flink cluster when HA is enabled.
>>>
>>>
>>
>


Re: Flink WindowedStream - Need assistance

2018-07-17 Thread Timo Walther

Hi Titus,

have you looked into ProcessFunction? ProcessFunction[1] gives you 
access to the two important streaming primitives "time" and "state".


So in your case you can decide flexibly what you want to put into state 
and when you want to set and fire a timer (for clean-up) per key.


Regards,
Timo

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html



Am 17.07.18 um 11:39 schrieb Titus Rakkesh:

Friends, any assistance regarding this?


On Mon, Jul 16, 2018 at 3:44 PM, Titus Rakkesh 
wrote:


Dear All,

We have 2 independent streams which will receive elements in different
frequency,

DataStream> splittedActivationTuple;

DataStream> unionReloadsStream;

We have a requirement to keep "splittedActivationTuple" stream elements in
a Window of eviction time period of 24 hours. So I created a
"WindowedStream" like below,

WindowedStream, Tuple, GlobalWindow> 
keyedWindowedActStream = splittedActivationTuple
 .assignTimestampsAndWatermarks(new 
IngestionTimeExtractor()).keyBy(0).window(GlobalWindows.create())
 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)));

Our requirements are following,

1.

When "unionReloadsStream" receives data, we need to check whether the
corresponding "String" field matches with the "String" field in the
WindowedStream and accumulate "WindowedStream's" Double with
"unionReloadsStream" Double.Will this possible with Flink? I checked
CoGroup and CoMap. But I couldn't figure out how to do since I am new.
2.

CEP functionality to create a new Stream of from WindowedStream if the
Double value > 100? I went through several flink's CEP tutorials. But
couldn't able to figure out how to do with "WindowedStream"?

I am very new to flink. Any assistance would be highly appreciated.

Thanks,

Titus





Re: Flink CLI properties with HA

2018-07-17 Thread Till Rohrmann
Hi Sampath,

technically the client does not need to know the
`high-availability.storageDir` to submit a job. However, due to how we
construct the ZooKeeperHaServices it is still needed. The reason behind
this is that we use the same services for the server and the client. Thus,
the implementation needs to know the storageDir in both cases. The way it
should be done is to split the HighAvailabilityServices up into client and
server services. The former would then not depend on
`high-availability.storageDir`.

Cheers,
Till

On Tue, Jul 17, 2018 at 1:31 PM vino yang  wrote:

> Hi Sampath,
>
> It seems Flink CLI for standalone would not access
> *high-availability.storageDir.*
>
> What's the exception stack trace in your environment?
>
> Thanks, vino.
>
> 2018-07-17 15:08 GMT+08:00 Sampath Bhat :
>
>> Hi vino
>>
>> Should the flink CLI have access to the path mentioned in
>> *high-availability.storageDir*?
>> If my flink cluster is on set of machines and i submit my job from flink
>> CLI from another independent machine by giving necessary details will the
>> CLI try to access *high-availability.storageDir *path?
>>
>> I'm aware of the fact that flink client will connect to zookeeper to get
>> leader address and necessary information for job submission but my
>> confusion is with *high-availability.storageDir* and its necessity in
>> flink CLI configuration.
>>
>> On Mon, Jul 16, 2018 at 2:44 PM, vino yang  wrote:
>>
>>> Hi Sampath,
>>>
>>> Flink CLI need to retrieve the JobManager leader address, so it need  to
>>> access the HA specific configuration. Because if based on Zookeeper to
>>> implement the HA, the leader address information will fetch from Zookeeper.
>>>
>>> The main use of config item *high-availability.storageDir* is storage
>>> (Job graph, checkpoint and so on). Actually, the real data is stored under
>>> this path which used to recover purpose, zookeeper just store a state
>>> handle.
>>>
>>> ---
>>> Thanks.
>>> vino.
>>>
>>>
>>> 2018-07-16 15:28 GMT+08:00 Sampath Bhat :
>>>

 -- Forwarded message --
 From: Sampath Bhat 
 Date: Fri, Jul 13, 2018 at 3:18 PM
 Subject: Flink CLI properties with HA
 To: user 


 Hello

 When HA is enabled in the flink cluster and if I've to submit job via
 flink CLI then in the flink-conf.yaml of flink CLI should contain this
 properties -
 high-availability: zookeeper
 high-availability.cluster-id: flink
 high-availability.zookeeper.path.root: flink
 high-availability.storageDir: 
 high-availability.zookeeper.quorum: 

 What is the need of high-availability.storageDir for flink CLI. Does
 this mean that even flink client should be able to access the mentioned
 path or is it some check being done on the property name?

 Without these properties flink cli will not be able to submit job to
 flink cluster when HA is enabled.


>>>
>>
>


[ANNOUNCE] Weekly community update #29

2018-07-17 Thread Till Rohrmann
Dear community,

this is the weekly community update thread #29. Please post any news and
updates you want to share with the community to this thread.

# Feature freeze Flink 1.6

The Flink community has cut off the release branch for Flink 1.6 [1]. From
now on, the community will concentrate on fixing the last remaining release
blockers before Flink 1.6 will be released.

# Flink 1.5.1 released

The Flink community has released the first bug fix release 1.5.1 [2]
containing more than 60 fixes and improvements.

# Improve record serialization

Zhijiang kicked off a discussion about improving the way Flink serializes
records into multiple result subpartitions [3]. His proposal aims at
reducing redundant serialization overhead by sharing buffers across
multiple subpartitions. Please join the discussion if you want to learn
more.

[1]
https://git-wip-us.apache.org/repos/asf?p=flink.git;a=shortlog;h=refs/heads/release-1.6
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-5-1-released-td23300.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-broadcast-serialization-td23295.html

Cheers,
Till


[jira] [Created] (FLINK-9874) set_conf_ssl in E2E tests fails on macOS

2018-07-17 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-9874:
--

 Summary: set_conf_ssl in E2E tests fails on macOS
 Key: FLINK-9874
 URL: https://issues.apache.org/jira/browse/FLINK-9874
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0
Reporter: Florian Schmidt


Setting up a cluster with SSL support in the end-to-end tests with 
`_set_conf_ssl_` will fail under macOS because in the command
{code:java}
hostname -I{code}
is used, but '_-I'_ is not a supported parameter for the hostname command under 
macOS



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9875) Add concurrent creation of execution job vertex

2018-07-17 Thread JIRA
陈梓立 created FLINK-9875:
--

 Summary: Add concurrent creation of execution job vertex
 Key: FLINK-9875
 URL: https://issues.apache.org/jira/browse/FLINK-9875
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.5.1
Reporter: 陈梓立
Assignee: 陈梓立


in some case like inputformat vertex, creation of execution job vertex is time
consuming, this pr add concurrent creation of execution job vertex to 
accelerate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9876) ExecutionGraphCoLocationRestartTest testConstraintsAfterRestart sometimes fails with timeout

2018-07-17 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-9876:
--

 Summary: ExecutionGraphCoLocationRestartTest 
testConstraintsAfterRestart sometimes fails with timeout
 Key: FLINK-9876
 URL: https://issues.apache.org/jira/browse/FLINK-9876
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.6.0
Reporter: Florian Schmidt


 
See https://travis-ci.org/florianschmidt1994/flink/jobs/404871788
{code:java}
Running org.apache.flink.runtime.executiongraph.ExecutionTest Tests run: 2, 
Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 8.326 sec <<< FAILURE! - in 
org.apache.flink.runtime.executiongraph.ExecutionGraphCoLocationRestartTest 
testConstraintsAfterRestart[Scheduler type = 
SLOT_POOL](org.apache.flink.runtime.executiongraph.ExecutionGraphCoLocationRestartTest)
 Time elapsed: 6.545 sec <<< ERROR! java.util.concurrent.TimeoutException: null 
 at 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus(ExecutionGraphTestUtils.java:121)
  at 
org.apache.flink.runtime.executiongraph.ExecutionGraphCoLocationRestartTest.testConstraintsAfterRestart(ExecutionGraphCoLocationRestartTest.java:106)
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9877) Add separate docs page for different join types in DataStream API

2018-07-17 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-9877:
--

 Summary: Add separate docs page for different join types in 
DataStream API
 Key: FLINK-9877
 URL: https://issues.apache.org/jira/browse/FLINK-9877
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Florian Schmidt
Assignee: Florian Schmidt
 Fix For: 1.6.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9878) IO worker threads BLOCKED on SSL Session Cache while CMS full gc

2018-07-17 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-9878:
--

 Summary: IO worker threads BLOCKED on SSL Session Cache while CMS 
full gc
 Key: FLINK-9878
 URL: https://issues.apache.org/jira/browse/FLINK-9878
 Project: Flink
  Issue Type: Bug
  Components: Network
Affects Versions: 1.5.1, 1.5.0, 1.6.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.5.2, 1.6.0


According to https://github.com/netty/netty/issues/832, there is a JDK issue 
during garbage collection when the SSL session cache is not limited. We should 
allow the user to configure this and further (advanced) SSL parameters for 
fine-tuning to fix this and similar issues. In particular, the following 
parameters should be configurable:
- SSL session cache size
- SSL session timeout
- SSL handshake timeout
- SSL close notify flush timeout





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9879) Find sane defaults for (advanced) SSL session parameters

2018-07-17 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-9879:
--

 Summary: Find sane defaults for (advanced) SSL session parameters
 Key: FLINK-9879
 URL: https://issues.apache.org/jira/browse/FLINK-9879
 Project: Flink
  Issue Type: Improvement
  Components: Network
Affects Versions: 1.5.2, 1.6.0
Reporter: Nico Kruber
 Fix For: 1.6.1


After adding these configuration parameters with 
https://issues.apache.org/jira/browse/FLINK-9878:
- SSL session cache size
- SSL session timeout
- SSL handshake timeout
- SSL close notify flush timeout

We should try to find sane defaults that "just work" :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[ANNOUNCE] Program for Flink Forward Berlin 2018 has been announced

2018-07-17 Thread Fabian Hueske
Hi everyone,

I'd like to announce the program for Flink Forward Berlin 2018.

The program committee [1] assembled a program of about 50 talks on use
cases, operations, ecosystem, tech deep dive, and research topics.
The conference will host speakers from Airbnb, Amazon, Google, ING, Lyft,
Microsoft, Netflix and Uber and many other organizations.

https://berlin-2018.flink-forward.org/conference-program

Flink Forward Berlin 2018 will take place on September 3-5.
As previously announced, the registration is open and as a member of the
Flink community we offer you a 20% discount on your conference pass if you
register with the code *MailingList*.

https://berlin-2018.flink-forward.org/register/

Best regards,
Fabian
(PC Chair for Flink Forward Berlin 2018)

[1] https://berlin-2018.flink-forward.org/program-committee/


Re: [DISCUSS] Improve broadcast serialization

2018-07-17 Thread Piotr Nowojski
Hi

Generally speaking this would be a nice optimisation, however it might be 
tricky to implement. The thing to keep in mind is that currently interface 
allow to interleave broadcasting and normal sending, because of that at any 
given time some serialisers can have more data then others. For example when we 
have two output channels and we are looping following writes:

Write sth to 1. Channel
Broadcast to all channels
Write sth to 1. Channel
Broadcast to all channels
Write sth to 1. Channel
Broadcast to all channels
(…)

Thus buffers of different channels can fill out with different rates.

> In theory every record can be serialized only once and referenced for all the 
> subpartitions in broadcast mode.

The problem here is that after records serialising, the only unit that can be 
referenced afterwards is “Buffer”. So that would leave us now with couple of 
options:

1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, 
guaranteeing that all channels always receive the same data. Here you could 
serialise records only once, to one BufferBuilder that could be shared and 
referenced by multiple BufferConsumers from different channels. Any non 
broadcast write would have to fail.

2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. 
for broadcasts, but for any non broadcast write: finish current broadcasting 
BufferBuilder, flush all data on all channels, serialise single record to 
single channel using newly create BufferBuilder and also immediately 
finish/flush it, so that any subsequent broadcasts will work again as in 1.:

3. Similar as 2, but lazily switch between broadcasting and non-broadcasting 
modes. It would have two modes of operating that could be switched back and 
forth: the same as currently implemented for non-broadcasted and optimised 
broadcast mode

Broadcast to all channels
Broadcast to all channels
Broadcast to all channels
Broadcast to all channels
Write sth to X Channel // this flushes all channels and clears/finishes 
previous BufferBuilder 
Write sth to Y Channel
Write sth to Y Channel
Write sth to Y Channel
Write sth to X Channel 
Broadcast to all channels // this flushes all channels and clears/finishes 
previous BufferBuilders, 
Broadcast to all channels
Broadcast to all channels
(…)

However both in 2. and 3. there would be very big penalty for mixing broadcast 
with normal writes.  

Piotrek

> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) 
>  wrote:
> 
> Hi all,
> 
> In current implementation, the RecordSerializer is created separately for 
> each subpartition in RecordWriter, that means the number of serializers 
> equals to the number of subpartitions.
> For broadcast partitioner, every record will be serialized many times in all 
> the subpartitions, and this may bring bad performance to some extent.
> In theory every record can be serialized only once and referenced for all the 
> subpartitions in broadcast mode.
> 
> To do so, I propose the following changes:
> 1. Create and maintain only one serializer in RecordWriter, and it will 
> serialize the record for all the subpartitions. It makes sense for any 
> partitioners, and the memory overhead can be also decreased, because every 
> serializer will maintain some separate byte buffers internally.
> 2. Maybe we can abstract the RecordWriter as a base class used for other 
> partitioner mode and implement a BroadcastRecordWriter for 
> BroadcastPartitioner. And this new implementation will add buffer references 
> based on the number of subpartitions before adding into subpartition queue.
> 3. Maybe we can remove StreamRecordWriter by migrating flusher from it to 
> RecordWriter, then the uniform RecordWriter can be used for both stream and 
> batch. The above BroadcastRecordWriter can aslo uniform for both stream and 
> batch.
> 
> I am not sure whether this improvement is proposed before and what do you 
> think of it?
> If necessary I can create JIRAs to contirbute it, and may need one commiter 
> cooperate with me.
> 
> Best,
> 
> Zhijiang



[jira] [Created] (FLINK-9880) Incorrect argument order calling BucketerContext#update

2018-07-17 Thread Ted Yu (JIRA)
Ted Yu created FLINK-9880:
-

 Summary: Incorrect argument order calling BucketerContext#update
 Key: FLINK-9880
 URL: https://issues.apache.org/jira/browse/FLINK-9880
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


In StreamingFileSink.java :
{code}
bucketerContext.update(context.timestamp(), currentProcessingTime, 
context.currentWatermark());
{code}
However, the method update is declared as :
{code}
void update(@Nullable Long elementTimestamp, long currentWatermark, long 
currentProcessingTime) {
{code}
The second and third parameters seem to be swapped.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9881) Typo in a function name in table.scala

2018-07-17 Thread Ashwin Sinha (JIRA)
Ashwin Sinha created FLINK-9881:
---

 Summary: Typo in a function name in table.scala
 Key: FLINK-9881
 URL: https://issues.apache.org/jira/browse/FLINK-9881
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Ashwin Sinha
Assignee: Ashwin Sinha






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


What is the way to prove performance improvement?

2018-07-17 Thread 陈梓立
Hi all,

Recently I pull 3 PRs about performance improvements[1][2][3]. Unit tests
will verify their correctness, and in the real scenario, we have benchmark
report to confirm that they do help for performance.

I wonder what is the formal way to verify a performance improvement. Is it
to give out a benchmark report, or run a standard benchmark, or add
performance test(I don't know how to do it), or anything else.

Looking forward for your reply.

Best,
tison.

[1] https://github.com/apache/flink/pull/6339
[2] https://github.com/apache/flink/pull/6345
[3] https://github.com/apache/flink/pull/6353


回复:[DISCUSS] Improve broadcast serialization

2018-07-17 Thread Zhijiang(wangzhijiang999)
Hi Piotr,

Thanks for your replies and professional suggestions!

My initial thought is just as you said in first suggestion. The current 
RecordWriter will emit StreamRecord to some subpartition via ChannelSelector or 
broadcast events/watermark to all subpartitions directly.
If the ChannelSelector implementation is BroadcastPartitioner, then we can 
create a specialized BroadcastRecordWriter to handle the 'emit', 
'broadcastEmit', 'broadcastEvent', etc.
To make it seems not tricky, I want to abstract the RecordWriter as a plugin, 
then implement a BroadcastRecordWriter and NonBroadcastRecordWriter separately 
to extend abstract RecordWriter. That means we divide the RecordWriter by 
ChannelSelector, and also we may remove current StreamRecordWriter to uniform 
the RecordWriter criteria in both stream and batch mode.

Considering specific implementations, I think one RecordSerializer can work for 
both BroadcastRecordWriter and NonBroadcastRecordWriter, but the precondition 
is making the RecordSerializer has no internal state, so we have to remove the 
BufferBuilder variable from SpanningRecordSerializer and pass it via 
addRecord/continueWritingWithNextBufferBuilder
 methods from RecordWriter. BroadcastRecordWriter only needs maintain one 
BufferBuilder for all subpartitions, and NonBroadcastRecordWriter may need 
maintain one BufferBuilder per subpartition.

Another issue is whether this improvement is suitable for 
broadcastEmit(watermark) in NonBroadcastRecordWriter as you said in suggestion 
2,3. I wonder it may decrease the buffer utilization if switch between 
broadcast and non-broadcast modes, even it may seem more tricky in 
implementation. I am still thinking of it.

Maybe we can implement the improvement for BroadcastPartitioner in first step 
and make sure one RecordSerializer for all subpartitions. That can reduce the 
memory overhead in RecordSerializer and the time cost in broadcast 
serialization scenarios.

Best,

Zhijiang


--
发件人:Piotr Nowojski 
发送时间:2018年7月17日(星期二) 23:31
收件人:dev ; Zhijiang(wangzhijiang999) 

主 题:Re: [DISCUSS] Improve broadcast serialization

Hi

Generally speaking this would be a nice optimisation, however it might be 
tricky to implement. The thing to keep in mind is that currently interface 
allow to interleave broadcasting and normal sending, because of that at any 
given time some serialisers can have more data then others. For example when we 
have two output channels and we are looping following writes:

Write sth to 1. Channel
Broadcast to all channels
Write sth to 1. Channel
Broadcast to all channels
Write sth to 1. Channel
Broadcast to all channels
(…)

Thus buffers of different channels can fill out with different rates.

> In theory every record can be serialized only once and referenced for all the 
> subpartitions in broadcast mode.

The problem here is that after records serialising, the only unit that can be 
referenced afterwards is “Buffer”. So that would leave us now with couple of 
options:

1. Create a specialised BroadcastRecordWriter that supports ONLY broadcasting, 
guaranteeing that all channels always receive the same data. Here you could 
serialise records only once, to one BufferBuilder that could be shared and 
referenced by multiple BufferConsumers from different channels. Any non 
broadcast write would have to fail.

2. Similar as above, but specialised in MOSTLY broadcasting. Operate as in 1. 
for broadcasts, but for any non broadcast write: finish current broadcasting 
BufferBuilder, flush all data on all channels, serialise single record to 
single channel using newly create BufferBuilder and also immediately 
finish/flush it, so that any subsequent broadcasts will work again as in 1.:

3. Similar as 2, but lazily switch between broadcasting and non-broadcasting 
modes. It would have two modes of operating that could be switched back and 
forth: the same as currently implemented for non-broadcasted and optimised 
broadcast mode

Broadcast to all channels
Broadcast to all channels
Broadcast to all channels
Broadcast to all channels
Write sth to X Channel // this flushes all channels and clears/finishes 
previous BufferBuilder 
Write sth to Y Channel
Write sth to Y Channel
Write sth to Y Channel
Write sth to X Channel 
Broadcast to all channels // this flushes all channels and clears/finishes 
previous BufferBuilders, 
Broadcast to all channels
Broadcast to all channels
(…)

However both in 2. and 3. there would be very big penalty for mixing broadcast 
with normal writes.  

Piotrek

> On 13 Jul 2018, at 09:44, Zhijiang(wangzhijiang999) 
>  wrote:
> 
> Hi all,
> 
> In current implementation, the RecordSerializer is created separately for 
> each subpartition in RecordWriter, that means the number of serializers 
> equals to the number of subpartitions.
> For broadcast partitioner, every record will be serialized many times in all 
> the subpartitions, and this may bring bad

[jira] [Created] (FLINK-9882) A function access can be private in client

2018-07-17 Thread Riley Li (JIRA)
Riley Li created FLINK-9882:
---

 Summary: A function access can be private in client
 Key: FLINK-9882
 URL: https://issues.apache.org/jira/browse/FLINK-9882
 Project: Flink
  Issue Type: Improvement
  Components: Client
Reporter: Riley Li






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Flink 1.5 job distribution over cluster nodes

2018-07-17 Thread Shachar Carmeli
Hi,

We have 4 jobs with parallelism 3 that are running over 3 task manager with
4 slots per each , each task manager runs on a different VM ,

On Flink 1.3.2 the jobs were evenly distributed per node each job took one
task slot of each task manager .

 After upgrading to flink 1.5 , each job is running on a single task
manager (with a carry over to another if there are no slots left)

The jobs are not evenly by load which cause some task managers  to consume
more resources (CPU/memory) than other task managers.

Is there a way to return to an even distribution?

Thanks,

Shachar


Re: What is the way to prove performance improvement?

2018-07-17 Thread Till Rohrmann
Hi Tison,

at the moment there is formal way to verify performance improvements. What
you can do is to provide your measurements by adding the graphs to the PR
thread and specify the setup. Then others could try to verify these numbers
by running their own benchmark.

Cheers,
Till

On Wed, Jul 18, 2018 at 1:34 AM 陈梓立  wrote:

> Hi all,
>
> Recently I pull 3 PRs about performance improvements[1][2][3]. Unit tests
> will verify their correctness, and in the real scenario, we have benchmark
> report to confirm that they do help for performance.
>
> I wonder what is the formal way to verify a performance improvement. Is it
> to give out a benchmark report, or run a standard benchmark, or add
> performance test(I don't know how to do it), or anything else.
>
> Looking forward for your reply.
>
> Best,
> tison.
>
> [1] https://github.com/apache/flink/pull/6339
> [2] https://github.com/apache/flink/pull/6345
> [3] https://github.com/apache/flink/pull/6353
>


Re: What is the way to prove performance improvement?

2018-07-17 Thread 陈梓立
Hi Till,

Thanks for your reply! I will try to add ones later.

Best,
tison.

Till Rohrmann  于2018年7月18日周三 下午2:48写道:

> Hi Tison,
>
> at the moment there is formal way to verify performance improvements. What
> you can do is to provide your measurements by adding the graphs to the PR
> thread and specify the setup. Then others could try to verify these numbers
> by running their own benchmark.
>
> Cheers,
> Till
>
> On Wed, Jul 18, 2018 at 1:34 AM 陈梓立  wrote:
>
> > Hi all,
> >
> > Recently I pull 3 PRs about performance improvements[1][2][3]. Unit tests
> > will verify their correctness, and in the real scenario, we have
> benchmark
> > report to confirm that they do help for performance.
> >
> > I wonder what is the formal way to verify a performance improvement. Is
> it
> > to give out a benchmark report, or run a standard benchmark, or add
> > performance test(I don't know how to do it), or anything else.
> >
> > Looking forward for your reply.
> >
> > Best,
> > tison.
> >
> > [1] https://github.com/apache/flink/pull/6339
> > [2] https://github.com/apache/flink/pull/6345
> > [3] https://github.com/apache/flink/pull/6353
> >
>