[jira] [Closed] (FLINK-7468) Implement sender backlog logic for credit-based

2018-01-16 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-7468.
---
Resolution: Fixed

> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-16 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang reopened FLINK-7456:
-

> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328393#comment-16328393
 ] 

ASF GitHub Bot commented on FLINK-7456:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , I have submitted the switch for keeping the old mode and the new 
credit-based mode.


> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

2018-01-16 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4552
  
@NicoK , I have submitted the switch for keeping the old mode and the new 
credit-based mode.


---


[jira] [Closed] (FLINK-7456) Implement Netty sender incoming pipeline for credit-based

2018-01-16 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang closed FLINK-7456.
---
Resolution: Fixed

> Implement Netty sender incoming pipeline for credit-based
> -
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer 
> {{currentCredit}} from receiver. Once receiving the messages of 
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by 
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered 
> available for transfer to make sure it is enqueued in handler only once. If 
> the {{currentCredit}} increases from zero and there are available buffers in 
> the subpartition, the corresponding view will be enqueued for transferring 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8431) Allow to specify # GPUs for TaskManager in Mesos

2018-01-16 Thread Dongwon Kim (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16328386#comment-16328386
 ] 

Dongwon Kim commented on FLINK-8431:


[~eronwright] I'm testing my implementation by launching a standalone Flink 
cluster using {{./bin/mesos-appmaster.sh}}. I tested the following scenarios 
with Mesos configured with {{--filter_gpu_resources}}.
 * *When {{mesos.resourcemanager.tasks.gpus}} is not specified or is set to 0.0*
 ** {{LaunchCoordinator}} isn't given any offer because 
{{MesosFlinkResourceManager}} does not enable {{GPU_RESOURCES}} capability when 
{{mesos.resourcemanager.tasks.gpus}} is not specified or it is set to 0.
 * *When {{mesos.resourcemanager.tasks.gpus}} is smaller than or equal to the 
available GPUs on a node* 
 ** Given offers, {{LaunchCoordinator}} aggregates offers of different roles 
from the same node and puts aggregated offers to Fenzo for scheduling resources 
over nodes. When notified of the success of scheduling from Fenzo, 
{{LaunchCoordinator}} allocates resources of different roles to tasks and then 
populate {{Protos.TaskInfo}} using the allocated resources which is then wired 
to the Mesos master.
 * *When {{mesos.resourcemanager.tasks.gpus}} is bigger than the available GPUs 
on a node* 
 ** Given offers, {{LaunchCoordinator}} aggregates offers of different roles 
from the same node and puts aggregated offers to Fenzo. However, Fenzo notifies 
{{LaunchCoordinator}} of the failure of scheduling with the following messages:
     AssignmentFailure \{resource=Other, asking=3.0, used=0.0, available=2.0, 
message=gpus}.

> Allow to specify # GPUs for TaskManager in Mesos
> 
>
> Key: FLINK-8431
> URL: https://issues.apache.org/jira/browse/FLINK-8431
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Mesos
>Reporter: Dongwon Kim
>Assignee: Dongwon Kim
>Priority: Minor
>
> Mesos provides first-class support for Nvidia GPUs [1], but Flink does not 
> exploit it when scheduling TaskManagers. If Mesos agents are configured to 
> isolate GPUs as shown in [2], TaskManagers that do not specify to use GPUs 
> cannot see GPUs at all.
> We, therefore, need to introduce a new configuration property named 
> "mesos.resourcemanager.tasks.gpus" to allow users to specify # of GPUs for 
> each TaskManager process in Mesos.
> [1] http://mesos.apache.org/documentation/latest/gpu-support/
> [2] http://mesos.apache.org/documentation/latest/gpu-support/#agent-flags



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327907#comment-16327907
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r161905498
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -167,24 +167,48 @@ public void update(List values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = 
keySerializationStream.toByteArray();
-   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
 
-   List bytes = new 
ArrayList<>(values.size());
-   for (V value : values) {
-   keySerializationStream.reset();
-   valueSerializer.serialize(value, out);
-   
bytes.add(keySerializationStream.toByteArray());
+   byte[] premerge = getPreMergedValue(values);
+   if (premerge != null) {
+   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   } else {
+   throw new IOException("Failed pre-merge 
values in update()");
}
+   } catch (IOException | RocksDBException e) {
+   throw new RuntimeException("Error while 
updating data to RocksDB", e);
+   }
+   }
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   if (values != null && !values.isEmpty()) {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = 
keySerializationStream.toByteArray();
 
-   byte[] premerge = MergeUtils.merge(bytes);
+   byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
-   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
} else {
-   throw new IOException("Failed pre-merge 
values");
+   throw new IOException("Failed pre-merge 
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while 
updating data to RocksDB", e);
}
}
}
+
+   private byte[] getPreMergedValue(List values) throws IOException {
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+
+   List bytes = new ArrayList<>(values.size());
--- End diff --

@StefanRRichter  I actually tried it before, but it didn't work out very 
well. I can give it another try.

I don't think this PR should be addressing this issue, because that code is 
already there and this PR only move it to its own method. Besides, it will be 
great if we can get the new API in before I announce it in our Flink meetup at 
Seattle's Wednesday evening :) (Thanks in advance if that may take extra work 
from your end!)

I opened FLINK-8441 and I'll be working on it shortly after.


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5281: [FLINK-7938] [State Backend] support addAll() in L...

2018-01-16 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r161905498
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -167,24 +167,48 @@ public void update(List values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = 
keySerializationStream.toByteArray();
-   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
 
-   List bytes = new 
ArrayList<>(values.size());
-   for (V value : values) {
-   keySerializationStream.reset();
-   valueSerializer.serialize(value, out);
-   
bytes.add(keySerializationStream.toByteArray());
+   byte[] premerge = getPreMergedValue(values);
+   if (premerge != null) {
+   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   } else {
+   throw new IOException("Failed pre-merge 
values in update()");
}
+   } catch (IOException | RocksDBException e) {
+   throw new RuntimeException("Error while 
updating data to RocksDB", e);
+   }
+   }
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   if (values != null && !values.isEmpty()) {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = 
keySerializationStream.toByteArray();
 
-   byte[] premerge = MergeUtils.merge(bytes);
+   byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
-   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
} else {
-   throw new IOException("Failed pre-merge 
values");
+   throw new IOException("Failed pre-merge 
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while 
updating data to RocksDB", e);
}
}
}
+
+   private byte[] getPreMergedValue(List values) throws IOException {
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+
+   List bytes = new ArrayList<>(values.size());
--- End diff --

@StefanRRichter  I actually tried it before, but it didn't work out very 
well. I can give it another try.

I don't think this PR should be addressing this issue, because that code is 
already there and this PR only move it to its own method. Besides, it will be 
great if we can get the new API in before I announce it in our Flink meetup at 
Seattle's Wednesday evening :) (Thanks in advance if that may take extra work 
from your end!)

I opened FLINK-8441 and I'll be working on it shortly after.


---


[jira] [Updated] (FLINK-8441) serialize values and value separator directly to stream in RocksDBListState

2018-01-16 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8441:

Affects Version/s: 1.5.0

> serialize values and value separator directly to stream in RocksDBListState
> ---
>
> Key: FLINK-8441
> URL: https://issues.apache.org/jira/browse/FLINK-8441
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> In \{{RocksDBListState#getPreMergedValue}}, we could probably serialize 
> values and value separator directly into {{keySerializationStream}}.
> We tried once, it didn't work out. Let's try one more time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8441) serialize values and value separator directly to stream in RocksDBListState

2018-01-16 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8441:

Fix Version/s: 1.5.0

> serialize values and value separator directly to stream in RocksDBListState
> ---
>
> Key: FLINK-8441
> URL: https://issues.apache.org/jira/browse/FLINK-8441
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> In \{{RocksDBListState#getPreMergedValue}}, we could probably serialize 
> values and value separator directly into {{keySerializationStream}}.
> We tried once, it didn't work out. Let's try one more time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8441) serialize values and value separator directly to stream in RocksDBListState

2018-01-16 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8441:
---

 Summary: serialize values and value separator directly to stream 
in RocksDBListState
 Key: FLINK-8441
 URL: https://issues.apache.org/jira/browse/FLINK-8441
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Bowen Li
Assignee: Bowen Li


In \{{RocksDBListState#getPreMergedValue}}, we could probably serialize values 
and value separator directly into {{keySerializationStream}}.

We tried once, it didn't work out. Let's try one more time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327389#comment-16327389
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r161822214
  
--- Diff: flink-yarn/src/test/resources/flink-conf.yaml ---
@@ -0,0 +1,23 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+#
+# This is a test configuration for validation of YarnTaskManagerRunner.
+#
+
+taskmanager.tmp.dirs: /tmp
--- End diff --

This seems unrelated.


> Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5172: [FLINK-8275] [Security] fix keytab local path in Y...

2018-01-16 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r161822214
  
--- Diff: flink-yarn/src/test/resources/flink-conf.yaml ---
@@ -0,0 +1,23 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+#
+# This is a test configuration for validation of YarnTaskManagerRunner.
+#
+
+taskmanager.tmp.dirs: /tmp
--- End diff --

This seems unrelated.


---


[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327388#comment-16327388
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r161822100
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
@@ -142,19 +134,22 @@ public static void runYarnTaskManager(String[] args, 
final Class() {
-   @Override
-   public Integer call() {
-   try {
-   
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
-   }
-   catch (Throwable t) {
-   LOG.error("Error while starting 
the TaskManager", t);
-   
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+   if (mainRunner == null) {
--- End diff --

If we need to separate out the 'secured' code block, can we do that in all 
scenarios instead of the conditional logic here?   For example, move the block 
on lines 138-147 to a new method, and pass a method reference to 
`runTaskManager` as the callable.  


> Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5172: [FLINK-8275] [Security] fix keytab local path in Y...

2018-01-16 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r161822100
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
@@ -142,19 +134,22 @@ public static void runYarnTaskManager(String[] args, 
final Class() {
-   @Override
-   public Integer call() {
-   try {
-   
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
-   }
-   catch (Throwable t) {
-   LOG.error("Error while starting 
the TaskManager", t);
-   
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+   if (mainRunner == null) {
--- End diff --

If we need to separate out the 'secured' code block, can we do that in all 
scenarios instead of the conditional logic here?   For example, move the block 
on lines 138-147 to a new method, and pass a method reference to 
`runTaskManager` as the callable.  


---


[jira] [Assigned] (FLINK-8435) Adding BloomFilter/HyperLogLog state as Managed once

2018-01-16 Thread Aegeaner (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aegeaner reassigned FLINK-8435:
---

Assignee: Aegeaner

> Adding BloomFilter/HyperLogLog state as Managed once
> 
>
> Key: FLINK-8435
> URL: https://issues.apache.org/jira/browse/FLINK-8435
> Project: Flink
>  Issue Type: New Feature
>Reporter: Moein Hosseini
>Assignee: Aegeaner
>Priority: Major
>
> Many times doing something approximately is enough for users (Such as 
> counting). I think we should implement bloom filter and HyperLogLog as state 
> for keyed state. By this users are able to filter or process many things 
> approximately.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327379#comment-16327379
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r161821092
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
@@ -126,12 +124,6 @@ public static void runYarnTaskManager(String[] args, 
final Class Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5172: [FLINK-8275] [Security] fix keytab local path in Y...

2018-01-16 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r161821092
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
@@ -126,12 +124,6 @@ public static void runYarnTaskManager(String[] args, 
final Class

[jira] [Assigned] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-01-16 Thread Aegeaner (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aegeaner reassigned FLINK-8354:
---

Assignee: Aegeaner

> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8440) Create Gitter Chanel for Flink users

2018-01-16 Thread Josh Lemer (JIRA)
Josh Lemer created FLINK-8440:
-

 Summary: Create Gitter Chanel for Flink users
 Key: FLINK-8440
 URL: https://issues.apache.org/jira/browse/FLINK-8440
 Project: Flink
  Issue Type: Task
Reporter: Josh Lemer


Could we get a [www.gitter.im|http://www.gitter.im/] channel for Flink Users 
(and/or contributors) to ask questions in? Lots of people in lots of projects 
online get a lot out of having a chat room, and the IRC channel is pretty dead 
and of course has pretty terrible user experience (no history, etc). 

Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327369#comment-16327369
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r161818655
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -48,7 +48,7 @@ public static SecurityContext getInstalledContext() {
}
 
@VisibleForTesting
--- End diff --

(minor) Please remove the `@VisibleForTesting` annotation.


> Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5172: [FLINK-8275] [Security] fix keytab local path in Y...

2018-01-16 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r161818655
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 ---
@@ -48,7 +48,7 @@ public static SecurityContext getInstalledContext() {
}
 
@VisibleForTesting
--- End diff --

(minor) Please remove the `@VisibleForTesting` annotation.


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807900
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import java.io.Serializable;
+
+/**
+ * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for 
Dynamic Session Window Assigners.
+ *
+ * @param  The type of elements that this {@code 
SessionWindowTimeGapExtractor} can extract session time gaps from.
+ */
+public interface SessionWindowTimeGapExtractor extends Serializable {
+   long extract(T element, long timestamp, 
WindowAssigner.WindowAssignerContext context);
--- End diff --

can do.

Do you think it should return a Time instead of a long? To prevent anyone 
trying to return seconds/minutes/whatever instead of milliseconds?


---


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327322#comment-16327322
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807828
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

The Typed requirement comes from the desire to allow the 
SessionWindowTimeGapExtractor to accept a correctly typed element.

To do that the Assigner itself needs to be typed, which means that the 
trigger needs to be typed and so on.

If the SessionWindowTimeGapExtractor extract method instead took `object`, 
requiring that the implementer cast it, then the new Typed classes wouldn't be 
necessary.

I don't find that to be the most user friendly interface though, when the 
type information is available. But, yeah, I'm not happy with having to 
implement these exact copy classes either...


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327323#comment-16327323
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new TimeWindow(timestamp, 
timestamp + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedEventTimeTrigger.create();
--- End diff --

as above


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 

[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327325#comment-16327325
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807869
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long currentProcessingTime = context.getCurrentProcessingTime();
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new 
TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedProcessingTimeTrigger.create();
--- End diff --

as above


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the 

[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327326#comment-16327326
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807900
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import java.io.Serializable;
+
+/**
+ * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for 
Dynamic Session Window Assigners.
+ *
+ * @param  The type of elements that this {@code 
SessionWindowTimeGapExtractor} can extract session time gaps from.
+ */
+public interface SessionWindowTimeGapExtractor extends Serializable {
+   long extract(T element, long timestamp, 
WindowAssigner.WindowAssignerContext context);
--- End diff --

can do.

Do you think it should return a Time instead of a long? To prevent anyone 
trying to return seconds/minutes/whatever instead of milliseconds?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327324#comment-16327324
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807850
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

as above


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807869
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long currentProcessingTime = context.getCurrentProcessingTime();
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new 
TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedProcessingTimeTrigger.create();
--- End diff --

as above


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new TimeWindow(timestamp, 
timestamp + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedEventTimeTrigger.create();
--- End diff --

as above


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807850
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

as above


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807828
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

The Typed requirement comes from the desire to allow the 
SessionWindowTimeGapExtractor to accept a correctly typed element.

To do that the Assigner itself needs to be typed, which means that the 
trigger needs to be typed and so on.

If the SessionWindowTimeGapExtractor extract method instead took `object`, 
requiring that the implementer cast it, then the new Typed classes wouldn't be 
necessary.

I don't find that to be the most user friendly interface though, when the 
type information is available. But, yeah, I'm not happy with having to 
implement these exact copy classes either...


---


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327307#comment-16327307
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799438
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long currentProcessingTime = context.getCurrentProcessingTime();
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new 
TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedProcessingTimeTrigger.create();
--- End diff --

DynamicProcessingTimeSessionWindows.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> 

[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161798960
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Can we change `MergingWindowAssigner ` to 
`MergingWindowAssigner ` ? if so we can reuse the 
`EventTimeTrigger`.


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161800866
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import java.io.Serializable;
+
+/**
+ * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for 
Dynamic Session Window Assigners.
+ *
+ * @param  The type of elements that this {@code 
SessionWindowTimeGapExtractor} can extract session time gaps from.
+ */
+public interface SessionWindowTimeGapExtractor extends Serializable {
+   long extract(T element, long timestamp, 
WindowAssigner.WindowAssignerContext context);
--- End diff --

Can we remove the parameter of `timestamp` and  
`WindowAssigner.WindowAssignerContext context`?


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799107
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new TimeWindow(timestamp, 
timestamp + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedEventTimeTrigger.create();
--- End diff --

can we reuse the EventTimeTrigger?


---


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327308#comment-16327308
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161800866
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import java.io.Serializable;
+
+/**
+ * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for 
Dynamic Session Window Assigners.
+ *
+ * @param  The type of elements that this {@code 
SessionWindowTimeGapExtractor} can extract session time gaps from.
+ */
+public interface SessionWindowTimeGapExtractor extends Serializable {
+   long extract(T element, long timestamp, 
WindowAssigner.WindowAssignerContext context);
--- End diff --

Can we remove the parameter of `timestamp` and  
`WindowAssigner.WindowAssignerContext context`?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327310#comment-16327310
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161798960
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Can we change `MergingWindowAssigner ` to 
`MergingWindowAssigner ` ? if so we can reuse the 
`EventTimeTrigger`.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327309#comment-16327309
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799107
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new TimeWindow(timestamp, 
timestamp + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedEventTimeTrigger.create();
--- End diff --

can we reuse the EventTimeTrigger?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. 

[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327306#comment-16327306
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799387
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Same as `DynamicEventTimeSessionWindows` comments.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799438
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long currentProcessingTime = context.getCurrentProcessingTime();
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new 
TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedProcessingTimeTrigger.create();
--- End diff --

DynamicProcessingTimeSessionWindows.


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799387
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Same as `DynamicEventTimeSessionWindows` comments.


---


[GitHub] flink pull request #4649: [FLINK-6116] Watermarks don't work when unioning w...

2018-01-16 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161791264
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -411,6 +413,13 @@ private void addEdgeInternal(Integer upStreamVertexID,
StreamNode upstreamNode = 
getStreamNode(upStreamVertexID);
StreamNode downstreamNode = 
getStreamNode(downStreamVertexID);
 
+   Tuple2 edgePair = new 
Tuple2<>(upstreamNode.getId(), downstreamNode.getId());
+   if (!uniqueEdgeMap.containsKey(edgePair)) {
--- End diff --

```
uniqueEdgeMap.put(edgePair, uniqueEdgeMap.getOrDefault(edgePair, 0) + 1)
```



---


[GitHub] flink pull request #4649: [FLINK-6116] Watermarks don't work when unioning w...

2018-01-16 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161797181
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -411,6 +413,13 @@ private void addEdgeInternal(Integer upStreamVertexID,
StreamNode upstreamNode = 
getStreamNode(upStreamVertexID);
StreamNode downstreamNode = 
getStreamNode(downStreamVertexID);
 
+   Tuple2 edgePair = new 
Tuple2<>(upstreamNode.getId(), downstreamNode.getId());
+   if (!uniqueEdgeMap.containsKey(edgePair)) {
+   uniqueEdgeMap.put(edgePair, 1);
+   } else {
+   uniqueEdgeMap.put(edgePair, 
uniqueEdgeMap.get(edgePair) + 1);
--- End diff --

maybe wrap all of this new code into function 
```
int edgeSubId = generateUniqueEdgeSubId(edgePair);
(...)
StreamEdge edge = new StreamEdge(..., edgeSubId);
```


---


[jira] [Commented] (FLINK-6116) Watermarks don't work when unioning with same DataStream

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327282#comment-16327282
 ] 

ASF GitHub Bot commented on FLINK-6116:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161799381
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -624,6 +633,10 @@ public long getLoopTimeout(Integer vertexID) {
return iterationSourceSinkPairs;
}
 
+   public Map, Integer> getUniqueEdgeMap() {
--- End diff --

This shouldn't be exposed


> Watermarks don't work when unioning with same DataStream
> 
>
> Key: FLINK-6116
> URL: https://issues.apache.org/jira/browse/FLINK-6116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Aljoscha Krettek
>Priority: Critical
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>   env.getConfig().setAutoWatermarkInterval(1000);
>   env.setParallelism(1);
>   DataStreamSource input = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   while (true) {
>   ctx.collect("hello!");
>   Thread.sleep(800);
>   }
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   input.union(input)
>   .flatMap(new IdentityFlatMap())
>   .transform("WatermarkOp", 
> BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
>   env.execute();
>   }
>   public static class WatermarkObserver
>   extends AbstractStreamOperator
>   implements OneInputStreamOperator {
>   @Override
>   public void processElement(StreamRecord element) throws 
> Exception {
>   System.out.println("GOT ELEMENT: " + element);
>   }
>   @Override
>   public void processWatermark(Watermark mark) throws Exception {
>   super.processWatermark(mark);
>   System.out.println("GOT WATERMARK: " + mark);
>   }
>   }
>   private static class IdentityFlatMap
>   extends RichFlatMapFunction {
>   @Override
>   public void flatMap(String value, Collector out) throws 
> Exception {
>   out.collect(value);
>   }
>   }
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4649: [FLINK-6116] Watermarks don't work when unioning w...

2018-01-16 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161793609
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
 ---
@@ -127,7 +127,8 @@ protected void initializeInputs() throws IOException, 
InterruptedException {
1,
new 
LinkedList(),
new 
BroadcastPartitioner(),
-   null /* output tag */);
+   null /* output tag */,
+   1);
--- End diff --

nit: formatting


---


[jira] [Commented] (FLINK-6116) Watermarks don't work when unioning with same DataStream

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327285#comment-16327285
 ] 

ASF GitHub Bot commented on FLINK-6116:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161793648
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
 ---
@@ -143,7 +144,8 @@ protected void initializeInputs() throws IOException, 
InterruptedException {
2,
new 
LinkedList(),
new 
BroadcastPartitioner(),
-   null /* output tag */);
+   null /* output tag */,
+   1);
--- End diff --

nit: formatting


> Watermarks don't work when unioning with same DataStream
> 
>
> Key: FLINK-6116
> URL: https://issues.apache.org/jira/browse/FLINK-6116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Aljoscha Krettek
>Priority: Critical
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>   env.getConfig().setAutoWatermarkInterval(1000);
>   env.setParallelism(1);
>   DataStreamSource input = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   while (true) {
>   ctx.collect("hello!");
>   Thread.sleep(800);
>   }
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   input.union(input)
>   .flatMap(new IdentityFlatMap())
>   .transform("WatermarkOp", 
> BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
>   env.execute();
>   }
>   public static class WatermarkObserver
>   extends AbstractStreamOperator
>   implements OneInputStreamOperator {
>   @Override
>   public void processElement(StreamRecord element) throws 
> Exception {
>   System.out.println("GOT ELEMENT: " + element);
>   }
>   @Override
>   public void processWatermark(Watermark mark) throws Exception {
>   super.processWatermark(mark);
>   System.out.println("GOT WATERMARK: " + mark);
>   }
>   }
>   private static class IdentityFlatMap
>   extends RichFlatMapFunction {
>   @Override
>   public void flatMap(String value, Collector out) throws 
> Exception {
>   out.collect(value);
>   }
>   }
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6116) Watermarks don't work when unioning with same DataStream

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327284#comment-16327284
 ] 

ASF GitHub Bot commented on FLINK-6116:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161797751
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 ---
@@ -60,17 +60,23 @@
 */
private StreamPartitioner outputPartitioner;
 
+   /**
+* The unique id for differentiating edges between the same source and 
target.
+*/
+   private final int edgeSubId;
+
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag) {
+   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag, int edgeSubId) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
this.outputTag = outputTag;
+   this.edgeSubId = edgeSubId;
--- End diff --

rename `edgeSubId` to `uniqueId`? Since it's not a "sub" id of the 
`edgeId`, but rather a component of it.


> Watermarks don't work when unioning with same DataStream
> 
>
> Key: FLINK-6116
> URL: https://issues.apache.org/jira/browse/FLINK-6116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Aljoscha Krettek
>Priority: Critical
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>   env.getConfig().setAutoWatermarkInterval(1000);
>   env.setParallelism(1);
>   DataStreamSource input = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   while (true) {
>   ctx.collect("hello!");
>   Thread.sleep(800);
>   }
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   input.union(input)
>   .flatMap(new IdentityFlatMap())
>   .transform("WatermarkOp", 
> BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
>   env.execute();
>   }
>   public static class WatermarkObserver
>   extends AbstractStreamOperator
>   implements OneInputStreamOperator {
>   @Override
>   public void processElement(StreamRecord element) throws 
> Exception {
>   System.out.println("GOT ELEMENT: " + element);
>   }
>   @Override
>   public void processWatermark(Watermark mark) throws Exception {
>   super.processWatermark(mark);
>   System.out.println("GOT WATERMARK: " + mark);
>   }
>   }
>   private static class IdentityFlatMap
>   extends RichFlatMapFunction {
>   @Override
>   public void flatMap(String value, Collector out) throws 
> Exception {
>   out.collect(value);
>   }
>   }
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6116) Watermarks don't work when unioning with same DataStream

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327279#comment-16327279
 ] 

ASF GitHub Bot commented on FLINK-6116:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161797181
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -411,6 +413,13 @@ private void addEdgeInternal(Integer upStreamVertexID,
StreamNode upstreamNode = 
getStreamNode(upStreamVertexID);
StreamNode downstreamNode = 
getStreamNode(downStreamVertexID);
 
+   Tuple2 edgePair = new 
Tuple2<>(upstreamNode.getId(), downstreamNode.getId());
+   if (!uniqueEdgeMap.containsKey(edgePair)) {
+   uniqueEdgeMap.put(edgePair, 1);
+   } else {
+   uniqueEdgeMap.put(edgePair, 
uniqueEdgeMap.get(edgePair) + 1);
--- End diff --

maybe wrap all of this new code into function 
```
int edgeSubId = generateUniqueEdgeSubId(edgePair);
(...)
StreamEdge edge = new StreamEdge(..., edgeSubId);
```


> Watermarks don't work when unioning with same DataStream
> 
>
> Key: FLINK-6116
> URL: https://issues.apache.org/jira/browse/FLINK-6116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Aljoscha Krettek
>Priority: Critical
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>   env.getConfig().setAutoWatermarkInterval(1000);
>   env.setParallelism(1);
>   DataStreamSource input = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   while (true) {
>   ctx.collect("hello!");
>   Thread.sleep(800);
>   }
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   input.union(input)
>   .flatMap(new IdentityFlatMap())
>   .transform("WatermarkOp", 
> BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
>   env.execute();
>   }
>   public static class WatermarkObserver
>   extends AbstractStreamOperator
>   implements OneInputStreamOperator {
>   @Override
>   public void processElement(StreamRecord element) throws 
> Exception {
>   System.out.println("GOT ELEMENT: " + element);
>   }
>   @Override
>   public void processWatermark(Watermark mark) throws Exception {
>   super.processWatermark(mark);
>   System.out.println("GOT WATERMARK: " + mark);
>   }
>   }
>   private static class IdentityFlatMap
>   extends RichFlatMapFunction {
>   @Override
>   public void flatMap(String value, Collector out) throws 
> Exception {
>   out.collect(value);
>   }
>   }
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4649: [FLINK-6116] Watermarks don't work when unioning w...

2018-01-16 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161799248
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -183,6 +183,14 @@ public Long map(Long value) throws Exception {
 
// verify self union

assertTrue(streamGraph.getStreamNode(selfUnion.getId()).getInEdges().size() == 
2);
+   assertTrue(streamGraph.getUniqueEdgeMap().size() == 12);
--- End diff --

Can not it be tested otherwise? This test shouldn't have an access to 
`getUniqueEdgeMap()` since that should be a part of the private implementation. 
If one chooses to implement this feature differently, this test would brake. 

Can not this test just check that 
`org.apache.flink.streaming.api.graph.StreamGraph#getStreamEdges(selfUnion.getId(),
 selfUnion.getId())` contains two unique edges?

Side note, shouldn't the `getStreamEdges()` return a `Set` 
instead of `List`?


---


[jira] [Commented] (FLINK-6116) Watermarks don't work when unioning with same DataStream

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327283#comment-16327283
 ] 

ASF GitHub Bot commented on FLINK-6116:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161799248
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -183,6 +183,14 @@ public Long map(Long value) throws Exception {
 
// verify self union

assertTrue(streamGraph.getStreamNode(selfUnion.getId()).getInEdges().size() == 
2);
+   assertTrue(streamGraph.getUniqueEdgeMap().size() == 12);
--- End diff --

Can not it be tested otherwise? This test shouldn't have an access to 
`getUniqueEdgeMap()` since that should be a part of the private implementation. 
If one chooses to implement this feature differently, this test would brake. 

Can not this test just check that 
`org.apache.flink.streaming.api.graph.StreamGraph#getStreamEdges(selfUnion.getId(),
 selfUnion.getId())` contains two unique edges?

Side note, shouldn't the `getStreamEdges()` return a `Set` 
instead of `List`?


> Watermarks don't work when unioning with same DataStream
> 
>
> Key: FLINK-6116
> URL: https://issues.apache.org/jira/browse/FLINK-6116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Aljoscha Krettek
>Priority: Critical
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>   env.getConfig().setAutoWatermarkInterval(1000);
>   env.setParallelism(1);
>   DataStreamSource input = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   while (true) {
>   ctx.collect("hello!");
>   Thread.sleep(800);
>   }
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   input.union(input)
>   .flatMap(new IdentityFlatMap())
>   .transform("WatermarkOp", 
> BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
>   env.execute();
>   }
>   public static class WatermarkObserver
>   extends AbstractStreamOperator
>   implements OneInputStreamOperator {
>   @Override
>   public void processElement(StreamRecord element) throws 
> Exception {
>   System.out.println("GOT ELEMENT: " + element);
>   }
>   @Override
>   public void processWatermark(Watermark mark) throws Exception {
>   super.processWatermark(mark);
>   System.out.println("GOT WATERMARK: " + mark);
>   }
>   }
>   private static class IdentityFlatMap
>   extends RichFlatMapFunction {
>   @Override
>   public void flatMap(String value, Collector out) throws 
> Exception {
>   out.collect(value);
>   }
>   }
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6116) Watermarks don't work when unioning with same DataStream

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327281#comment-16327281
 ] 

ASF GitHub Bot commented on FLINK-6116:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161793609
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
 ---
@@ -127,7 +127,8 @@ protected void initializeInputs() throws IOException, 
InterruptedException {
1,
new 
LinkedList(),
new 
BroadcastPartitioner(),
-   null /* output tag */);
+   null /* output tag */,
+   1);
--- End diff --

nit: formatting


> Watermarks don't work when unioning with same DataStream
> 
>
> Key: FLINK-6116
> URL: https://issues.apache.org/jira/browse/FLINK-6116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Aljoscha Krettek
>Priority: Critical
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>   env.getConfig().setAutoWatermarkInterval(1000);
>   env.setParallelism(1);
>   DataStreamSource input = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   while (true) {
>   ctx.collect("hello!");
>   Thread.sleep(800);
>   }
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   input.union(input)
>   .flatMap(new IdentityFlatMap())
>   .transform("WatermarkOp", 
> BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
>   env.execute();
>   }
>   public static class WatermarkObserver
>   extends AbstractStreamOperator
>   implements OneInputStreamOperator {
>   @Override
>   public void processElement(StreamRecord element) throws 
> Exception {
>   System.out.println("GOT ELEMENT: " + element);
>   }
>   @Override
>   public void processWatermark(Watermark mark) throws Exception {
>   super.processWatermark(mark);
>   System.out.println("GOT WATERMARK: " + mark);
>   }
>   }
>   private static class IdentityFlatMap
>   extends RichFlatMapFunction {
>   @Override
>   public void flatMap(String value, Collector out) throws 
> Exception {
>   out.collect(value);
>   }
>   }
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6116) Watermarks don't work when unioning with same DataStream

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327280#comment-16327280
 ] 

ASF GitHub Bot commented on FLINK-6116:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161795511
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -183,6 +183,14 @@ public Long map(Long value) throws Exception {
 
// verify self union

assertTrue(streamGraph.getStreamNode(selfUnion.getId()).getInEdges().size() == 
2);
+   assertTrue(streamGraph.getUniqueEdgeMap().size() == 12);
+   int selfUnionCount = 0;
+   for (Integer value : streamGraph.getUniqueEdgeMap().values()) {
+   if (value == 2) {
--- End diff --

why constant value of `2`? where does it come from? Shouldn't it be 
something like `selfUnion.getId()`?


> Watermarks don't work when unioning with same DataStream
> 
>
> Key: FLINK-6116
> URL: https://issues.apache.org/jira/browse/FLINK-6116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Aljoscha Krettek
>Priority: Critical
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>   env.getConfig().setAutoWatermarkInterval(1000);
>   env.setParallelism(1);
>   DataStreamSource input = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   while (true) {
>   ctx.collect("hello!");
>   Thread.sleep(800);
>   }
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   input.union(input)
>   .flatMap(new IdentityFlatMap())
>   .transform("WatermarkOp", 
> BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
>   env.execute();
>   }
>   public static class WatermarkObserver
>   extends AbstractStreamOperator
>   implements OneInputStreamOperator {
>   @Override
>   public void processElement(StreamRecord element) throws 
> Exception {
>   System.out.println("GOT ELEMENT: " + element);
>   }
>   @Override
>   public void processWatermark(Watermark mark) throws Exception {
>   super.processWatermark(mark);
>   System.out.println("GOT WATERMARK: " + mark);
>   }
>   }
>   private static class IdentityFlatMap
>   extends RichFlatMapFunction {
>   @Override
>   public void flatMap(String value, Collector out) throws 
> Exception {
>   out.collect(value);
>   }
>   }
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6116) Watermarks don't work when unioning with same DataStream

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327278#comment-16327278
 ] 

ASF GitHub Bot commented on FLINK-6116:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161791264
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -411,6 +413,13 @@ private void addEdgeInternal(Integer upStreamVertexID,
StreamNode upstreamNode = 
getStreamNode(upStreamVertexID);
StreamNode downstreamNode = 
getStreamNode(downStreamVertexID);
 
+   Tuple2 edgePair = new 
Tuple2<>(upstreamNode.getId(), downstreamNode.getId());
+   if (!uniqueEdgeMap.containsKey(edgePair)) {
--- End diff --

```
uniqueEdgeMap.put(edgePair, uniqueEdgeMap.getOrDefault(edgePair, 0) + 1)
```



> Watermarks don't work when unioning with same DataStream
> 
>
> Key: FLINK-6116
> URL: https://issues.apache.org/jira/browse/FLINK-6116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Aljoscha Krettek
>Priority: Critical
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
>   public static void main(String[] args) throws Exception {
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>   env.getConfig().setAutoWatermarkInterval(1000);
>   env.setParallelism(1);
>   DataStreamSource input = env.addSource(new 
> SourceFunction() {
>   @Override
>   public void run(SourceContext ctx) throws 
> Exception {
>   while (true) {
>   ctx.collect("hello!");
>   Thread.sleep(800);
>   }
>   }
>   @Override
>   public void cancel() {
>   }
>   });
>   input.union(input)
>   .flatMap(new IdentityFlatMap())
>   .transform("WatermarkOp", 
> BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
>   env.execute();
>   }
>   public static class WatermarkObserver
>   extends AbstractStreamOperator
>   implements OneInputStreamOperator {
>   @Override
>   public void processElement(StreamRecord element) throws 
> Exception {
>   System.out.println("GOT ELEMENT: " + element);
>   }
>   @Override
>   public void processWatermark(Watermark mark) throws Exception {
>   super.processWatermark(mark);
>   System.out.println("GOT WATERMARK: " + mark);
>   }
>   }
>   private static class IdentityFlatMap
>   extends RichFlatMapFunction {
>   @Override
>   public void flatMap(String value, Collector out) throws 
> Exception {
>   out.collect(value);
>   }
>   }
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4649: [FLINK-6116] Watermarks don't work when unioning w...

2018-01-16 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161793648
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
 ---
@@ -143,7 +144,8 @@ protected void initializeInputs() throws IOException, 
InterruptedException {
2,
new 
LinkedList(),
new 
BroadcastPartitioner(),
-   null /* output tag */);
+   null /* output tag */,
+   1);
--- End diff --

nit: formatting


---


[GitHub] flink pull request #4649: [FLINK-6116] Watermarks don't work when unioning w...

2018-01-16 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161795511
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -183,6 +183,14 @@ public Long map(Long value) throws Exception {
 
// verify self union

assertTrue(streamGraph.getStreamNode(selfUnion.getId()).getInEdges().size() == 
2);
+   assertTrue(streamGraph.getUniqueEdgeMap().size() == 12);
+   int selfUnionCount = 0;
+   for (Integer value : streamGraph.getUniqueEdgeMap().values()) {
+   if (value == 2) {
--- End diff --

why constant value of `2`? where does it come from? Shouldn't it be 
something like `selfUnion.getId()`?


---


[GitHub] flink pull request #4649: [FLINK-6116] Watermarks don't work when unioning w...

2018-01-16 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161799381
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 ---
@@ -624,6 +633,10 @@ public long getLoopTimeout(Integer vertexID) {
return iterationSourceSinkPairs;
}
 
+   public Map, Integer> getUniqueEdgeMap() {
--- End diff --

This shouldn't be exposed


---


[GitHub] flink pull request #4649: [FLINK-6116] Watermarks don't work when unioning w...

2018-01-16 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4649#discussion_r161797751
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 ---
@@ -60,17 +60,23 @@
 */
private StreamPartitioner outputPartitioner;
 
+   /**
+* The unique id for differentiating edges between the same source and 
target.
+*/
+   private final int edgeSubId;
+
public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag) {
+   List selectedNames, StreamPartitioner 
outputPartitioner, OutputTag outputTag, int edgeSubId) {
this.sourceVertex = sourceVertex;
this.targetVertex = targetVertex;
this.typeNumber = typeNumber;
this.selectedNames = selectedNames;
this.outputPartitioner = outputPartitioner;
this.outputTag = outputTag;
+   this.edgeSubId = edgeSubId;
--- End diff --

rename `edgeSubId` to `uniqueId`? Since it's not a "sub" id of the 
`edgeId`, but rather a component of it.


---


[GitHub] flink issue #5303: [hotfix][docs] Adjust RocksDb dependency docs

2018-01-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5303
  
I would suggest to start thinking about the dependencies the following way:

  - There are pure user-code projects where the Flink runtime is "provided" 
and they are started using an existing Flink setup (`bin/flink run` or REST 
entry point). This is the **Framework Style**.

  - In the future, we will have "Flink as a Library" deployments, where 
users add something like `flink-dist` as a library to their program and then 
simply dockerize that Java application.

  - Code can be run in the IDE or other similar style embedded forms. This 
is in some sense also a "Flink as a Library" deployment, but with selective 
(fewer) dependencies. The RocksDB issue applies only to this problem here.

To make this simpler for the users, it would be great to have not N 
different models that we talk about, but ideally only two: **Framework Style** 
and **Library Style**. We could for example start to advocate and document that 
users should always use `flink-dist` as their standard dependency - "provided" 
in the framework style deployment, "compile" in the library style deployment. 
That might be a really easy way to work with that. The only problem for the 
time being is that `flink-dist` is quite big and contains for example also 
optional dependencies like `flink-table`, which makes it more hwavyweight for 
quickstarts. Maybe we can accept that as a tradeoff for dependency simplicity.


---


[jira] [Created] (FLINK-8439) Document using a custom AWS Credentials Provider with flink-3s-fs-hadoop

2018-01-16 Thread Dyana Rose (JIRA)
Dyana Rose created FLINK-8439:
-

 Summary: Document using a custom AWS Credentials Provider with 
flink-3s-fs-hadoop
 Key: FLINK-8439
 URL: https://issues.apache.org/jira/browse/FLINK-8439
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Dyana Rose


This came up when using the s3 for the file system backend and running under 
ECS.

With no credentials in the container, hadoop-aws will default to EC2 instance 
level credentials when accessing S3. However when running under ECS, you will 
generally want to default to the task definition's IAM role.

In this case you need to set the hadoop property
{code:java}
fs.s3a.aws.credentials.provider{code}
to a fully qualified class name(s). see [hadoop-aws 
docs|https://github.com/apache/hadoop/blob/1ba491ff907fc5d2618add980734a3534e2be098/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md]

This works as expected when you add this setting to flink-conf.yaml but there 
is a further 'gotcha.'  Because the AWS sdk is shaded, the actual full class 
name for, in this case, the ContainerCredentialsProvider is
{code:java}
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
 

meaning the full setting is:
{code:java}
fs.s3a.aws.credentials.provider: 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
If you instead set it to the unshaded class name you will see a very confusing 
error stating that the ContainerCredentialsProvider doesn't implement 
AWSCredentialsProvider (which it most certainly does.)

Adding this information (how to specify alternate Credential Providers, and the 
name space gotcha) to the [AWS deployment 
docs|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html]
 would be useful to anyone else using S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8422) Checkstyle for org.apache.flink.api.java.tuple

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327217#comment-16327217
 ] 

ASF GitHub Bot commented on FLINK-8422:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/5292
  
Looks good to me. +1


> Checkstyle for org.apache.flink.api.java.tuple
> --
>
> Key: FLINK-8422
> URL: https://issues.apache.org/jira/browse/FLINK-8422
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
>
> Update {{TupleGenerator}} for Flink's checkstyle and rebuild {{Tuple}} and 
> {{TupleBuilder}} classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5292: [FLINK-8422] [core] Checkstyle for org.apache.flink.api.j...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/5292
  
Looks good to me. +1


---


[jira] [Reopened] (FLINK-7364) Log exceptions from user code in streaming jobs

2018-01-16 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-7364:
-

Just ran into this on master, will investigate in which circumstances this 
happens. Neither the TM nor JM logs log any exception, but only the state 
transitions.

> Log exceptions from user code in streaming jobs
> ---
>
> Key: FLINK-7364
> URL: https://issues.apache.org/jira/browse/FLINK-7364
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.1
>Reporter: Elias Levy
>Priority: Major
>
> Currently, if an exception arises in user supplied code within an operator in 
> a streaming job, Flink terminates the job, but it fails to record the reason 
> for the termination.  The logs do not record that there was an exception at 
> all, much less recording the type of exception and where it occurred.  This 
> makes it difficult to debug jobs without implementing exception recording 
> code on all user supplied operators. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7604) Add OpenTSDB metrics reporter

2018-01-16 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 closed FLINK-7604.
-
   Resolution: Won't Do
Fix Version/s: (was: 1.5.0)

> Add OpenTSDB metrics reporter
> -
>
> Key: FLINK-7604
> URL: https://issues.apache.org/jira/browse/FLINK-7604
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-4940) Add support for broadcast state

2018-01-16 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327201#comment-16327201
 ] 

Kostas Kloudas edited comment on FLINK-4940 at 1/16/18 3:01 PM:


"We therefore only have to checkpoint the state of one arbitrary instance, for 
example instance 0." 

I tend to disagree with this. The reason being that this way we can create a 
hotspot upon restoring. Imagine we have parallelism of 1000 and FS as out 
backend and upon restoring, every task has to read from the same disk(s). 

In addition, I think that the broadcast state should be in the 
OperatorStateBackend. Broadcast state is by definition not partitionable and 
therefore not a good fit for the KeyedStateBackend


was (Author: kkl0u):
"We therefore only have to checkpoint the state of one arbitrary instance, for 
example instance 0." 

I tend to disagree with this. The reason being that this way we can create a 
hotspot upon restoring. Imagine we have parallelism of 1000 and FS as out 
backend and upon restoring, every task has to read from the same disk(s). 

In addition, I think that the broadcast state should be in the 
OperatorStateBackend. Broadcast state is by definition not partitionable and 
therefore not a good fit for the KeyedStateBackend.

> Add support for broadcast state
> ---
>
> Key: FLINK-4940
> URL: https://issues.apache.org/jira/browse/FLINK-4940
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>Priority: Major
>
> As mentioned in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  we need broadcast state to support job patterns where one (or several) 
> inputs are broadcast to all operator instances and where we keep state that 
> that is mutated only based on input from broadcast inputs. This special 
> restriction ensures that the broadcast state is the same on all parallel 
> operator instances when checkpointing (except when using at-least-once mode). 
> We therefore only have to checkpoint the state of one arbitrary instance, for 
> example instance 0.
> For the different types of side inputs we need different types of state, 
> luckily, the side input types align with these state types we currently have 
> for keyed state:
>  - {{ValueState}}
>  - {{ListState}}
>  - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put 
> a more restricting API in front of it: mutation of broadcast state must only 
> be allowed when actually processing broadcast input. If we don't have this 
> check users can (by mistake) modify broadcast state. This would lead to 
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in 
> {{open()}} and work with state by calling methods on that) we have to add 
> special wrapping state classes that only allow modification of state when 
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
>  * Interface for accessing persistent state.
>  */
> @PublicEvolving
> public interface InternalStateAccessor {
>    S state(
>   N namespace,
>   TypeSerializer namespaceSerializer,
>   StateDescriptor stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us 
> to abstract away the special nature of broadcast state. This is also meant as 
> an external interface and is not to be exposed to user functions. Only 
> operators should deal with this.
> {{AbstractStreamOperator}} would get a new method 
> `getBroadcastStateAccessor()` that returns an implementation of this 
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the 
> state in special wrappers that only allow modification when processing 
> broadcast elements (as mentioned above). 
> On the lower implementation levels, we have to add a new entry for our state 
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation 
> logic will have to be adapted to support this new kind of state. With the 
> ongoing changes in supporting incremental snapshotting and other new features 
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or 
> [~stefanrichte...@gmail.com] and/or [~xiaogang.shi]. We also have to be very 
> careful about maintaining compatibility with savepoints from older versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-4940) Add support for broadcast state

2018-01-16 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327201#comment-16327201
 ] 

Kostas Kloudas edited comment on FLINK-4940 at 1/16/18 2:59 PM:


"We therefore only have to checkpoint the state of one arbitrary instance, for 
example instance 0." 

I tend to disagree with this. The reason being that this way we can create a 
hotspot upon restoring. Imagine we have parallelism of 1000 and FS as out 
backend and upon restoring, every task has to read from the same disk(s). 

In addition, I think that the broadcast state should be in the 
OperatorStateBackend. Broadcast state is by definition not partitionable and 
therefore not a good fit for the KeyedStateBackend.


was (Author: kkl0u):
"We therefore only have to checkpoint the state of one arbitrary instance, for 
example instance 0." 

I tend to disagree with this. The reason being that this way we can create a 
hotspot upon restoring. Imagine we have parallelism of 1000 and FS as out 
backend and upon restoring, every task has to read from the same disk(s).

 

In addition, I think that the broadcast state should be in the 
OperatorStateBackend. Broadcast state is by definition not partitionable and 
therefore not a good fit for the KeyedStateBackend.

> Add support for broadcast state
> ---
>
> Key: FLINK-4940
> URL: https://issues.apache.org/jira/browse/FLINK-4940
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>Priority: Major
>
> As mentioned in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  we need broadcast state to support job patterns where one (or several) 
> inputs are broadcast to all operator instances and where we keep state that 
> that is mutated only based on input from broadcast inputs. This special 
> restriction ensures that the broadcast state is the same on all parallel 
> operator instances when checkpointing (except when using at-least-once mode). 
> We therefore only have to checkpoint the state of one arbitrary instance, for 
> example instance 0.
> For the different types of side inputs we need different types of state, 
> luckily, the side input types align with these state types we currently have 
> for keyed state:
>  - {{ValueState}}
>  - {{ListState}}
>  - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put 
> a more restricting API in front of it: mutation of broadcast state must only 
> be allowed when actually processing broadcast input. If we don't have this 
> check users can (by mistake) modify broadcast state. This would lead to 
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in 
> {{open()}} and work with state by calling methods on that) we have to add 
> special wrapping state classes that only allow modification of state when 
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
>  * Interface for accessing persistent state.
>  */
> @PublicEvolving
> public interface InternalStateAccessor {
>    S state(
>   N namespace,
>   TypeSerializer namespaceSerializer,
>   StateDescriptor stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us 
> to abstract away the special nature of broadcast state. This is also meant as 
> an external interface and is not to be exposed to user functions. Only 
> operators should deal with this.
> {{AbstractStreamOperator}} would get a new method 
> `getBroadcastStateAccessor()` that returns an implementation of this 
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the 
> state in special wrappers that only allow modification when processing 
> broadcast elements (as mentioned above). 
> On the lower implementation levels, we have to add a new entry for our state 
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation 
> logic will have to be adapted to support this new kind of state. With the 
> ongoing changes in supporting incremental snapshotting and other new features 
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or 
> [~stefanrichte...@gmail.com] and/or [~xiaogang.shi]. We also have to be very 
> careful about maintaining compatibility with savepoints from older versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-4940) Add support for broadcast state

2018-01-16 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327201#comment-16327201
 ] 

Kostas Kloudas edited comment on FLINK-4940 at 1/16/18 2:58 PM:


"We therefore only have to checkpoint the state of one arbitrary instance, for 
example instance 0." 

I tend to disagree with this. The reason being that this way we can create a 
hotspot upon restoring. Imagine we have parallelism of 1000 and FS as out 
backend and upon restoring, every task has to read from the same disk(s).

 

In addition, I think that the broadcast state should be in the 
OperatorStateBackend. Broadcast state is by definition not partitionable and 
therefore not a good fit for the KeyedStateBackend.


was (Author: kkl0u):
"We therefore only have to checkpoint the state of one arbitrary instance, for 
example instance 0." 

I tend to disagree with this. The reason being that this way we can create a 
hotspot upon restoring. Imagine we have parallelism of 1000 and FS as out 
backend and upon restoring, every task has to read from the same disk(s).

> Add support for broadcast state
> ---
>
> Key: FLINK-4940
> URL: https://issues.apache.org/jira/browse/FLINK-4940
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>Priority: Major
>
> As mentioned in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  we need broadcast state to support job patterns where one (or several) 
> inputs are broadcast to all operator instances and where we keep state that 
> that is mutated only based on input from broadcast inputs. This special 
> restriction ensures that the broadcast state is the same on all parallel 
> operator instances when checkpointing (except when using at-least-once mode). 
> We therefore only have to checkpoint the state of one arbitrary instance, for 
> example instance 0.
> For the different types of side inputs we need different types of state, 
> luckily, the side input types align with these state types we currently have 
> for keyed state:
>  - {{ValueState}}
>  - {{ListState}}
>  - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put 
> a more restricting API in front of it: mutation of broadcast state must only 
> be allowed when actually processing broadcast input. If we don't have this 
> check users can (by mistake) modify broadcast state. This would lead to 
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in 
> {{open()}} and work with state by calling methods on that) we have to add 
> special wrapping state classes that only allow modification of state when 
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
>  * Interface for accessing persistent state.
>  */
> @PublicEvolving
> public interface InternalStateAccessor {
>    S state(
>   N namespace,
>   TypeSerializer namespaceSerializer,
>   StateDescriptor stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us 
> to abstract away the special nature of broadcast state. This is also meant as 
> an external interface and is not to be exposed to user functions. Only 
> operators should deal with this.
> {{AbstractStreamOperator}} would get a new method 
> `getBroadcastStateAccessor()` that returns an implementation of this 
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the 
> state in special wrappers that only allow modification when processing 
> broadcast elements (as mentioned above). 
> On the lower implementation levels, we have to add a new entry for our state 
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation 
> logic will have to be adapted to support this new kind of state. With the 
> ongoing changes in supporting incremental snapshotting and other new features 
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or 
> [~stefanrichte...@gmail.com] and/or [~xiaogang.shi]. We also have to be very 
> careful about maintaining compatibility with savepoints from older versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4940) Add support for broadcast state

2018-01-16 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327201#comment-16327201
 ] 

Kostas Kloudas commented on FLINK-4940:
---

"We therefore only have to checkpoint the state of one arbitrary instance, for 
example instance 0." 

I tend to disagree with this. The reason being that this way we can create a 
hotspot upon restoring. Imagine we have parallelism of 1000 and FS as out 
backend and upon restoring, every task has to read from the same disk(s).

> Add support for broadcast state
> ---
>
> Key: FLINK-4940
> URL: https://issues.apache.org/jira/browse/FLINK-4940
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>Priority: Major
>
> As mentioned in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  we need broadcast state to support job patterns where one (or several) 
> inputs are broadcast to all operator instances and where we keep state that 
> that is mutated only based on input from broadcast inputs. This special 
> restriction ensures that the broadcast state is the same on all parallel 
> operator instances when checkpointing (except when using at-least-once mode). 
> We therefore only have to checkpoint the state of one arbitrary instance, for 
> example instance 0.
> For the different types of side inputs we need different types of state, 
> luckily, the side input types align with these state types we currently have 
> for keyed state:
>  - {{ValueState}}
>  - {{ListState}}
>  - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put 
> a more restricting API in front of it: mutation of broadcast state must only 
> be allowed when actually processing broadcast input. If we don't have this 
> check users can (by mistake) modify broadcast state. This would lead to 
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in 
> {{open()}} and work with state by calling methods on that) we have to add 
> special wrapping state classes that only allow modification of state when 
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
>  * Interface for accessing persistent state.
>  */
> @PublicEvolving
> public interface InternalStateAccessor {
>    S state(
>   N namespace,
>   TypeSerializer namespaceSerializer,
>   StateDescriptor stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us 
> to abstract away the special nature of broadcast state. This is also meant as 
> an external interface and is not to be exposed to user functions. Only 
> operators should deal with this.
> {{AbstractStreamOperator}} would get a new method 
> `getBroadcastStateAccessor()` that returns an implementation of this 
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the 
> state in special wrappers that only allow modification when processing 
> broadcast elements (as mentioned above). 
> On the lower implementation levels, we have to add a new entry for our state 
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation 
> logic will have to be adapted to support this new kind of state. With the 
> ongoing changes in supporting incremental snapshotting and other new features 
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or 
> [~stefanrichte...@gmail.com] and/or [~xiaogang.shi]. We also have to be very 
> careful about maintaining compatibility with savepoints from older versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8438) Ensure that implicit Scala TypeInformation works in Eclipse IDE

2018-01-16 Thread David Anderson (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327197#comment-16327197
 ] 

David Anderson commented on FLINK-8438:
---

This code (adapted from the Flink scala quickstart)
{code:java}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.api.common.typeinfo.Types
 
val text = env.socketTextStream(hostName, port)
val foo = (x: String) => (x, 1)
val tuples = text.map(foo)
{code}
produces these errors when compiled by eclipse:

 
{noformat}
could not find implicit value for evidence parameter of type 
org.apache.flink.api.common.typeinfo.TypeInformation[(String, Int)]
not enough arguments for method map: (implicit evidence$7: 
org.apache.flink.api.common.typeinfo.TypeInformation[(String, 
Int)])org.apache.flink.streaming.api.scala.DataStream[(String, Int)]. 
Unspecified value parameter evidence$7.
{noformat}
 

This code can be made to compile by using TypeInformation.of(), but the 
resulting tuples are then GenericTypeInfo and don't really work. 
TypeInformation.of() doesn't work properly with Scala classes.

> Ensure that implicit Scala TypeInformation works in Eclipse IDE
> ---
>
> Key: FLINK-8438
> URL: https://issues.apache.org/jira/browse/FLINK-8438
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> It seems that Scala macros do not work properly in the Eclipse IDE. We need 
> to investigate this further and maybe need to update the Scala macros.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-4940) Add support for broadcast state

2018-01-16 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-4940:
-

Assignee: Kostas Kloudas

> Add support for broadcast state
> ---
>
> Key: FLINK-4940
> URL: https://issues.apache.org/jira/browse/FLINK-4940
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>Priority: Major
>
> As mentioned in 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>  we need broadcast state to support job patterns where one (or several) 
> inputs are broadcast to all operator instances and where we keep state that 
> that is mutated only based on input from broadcast inputs. This special 
> restriction ensures that the broadcast state is the same on all parallel 
> operator instances when checkpointing (except when using at-least-once mode). 
> We therefore only have to checkpoint the state of one arbitrary instance, for 
> example instance 0.
> For the different types of side inputs we need different types of state, 
> luckily, the side input types align with these state types we currently have 
> for keyed state:
>  - {{ValueState}}
>  - {{ListState}}
>  - {{MapState}}
> We can therefore reuse keyed state backends for our purposes but need to put 
> a more restricting API in front of it: mutation of broadcast state must only 
> be allowed when actually processing broadcast input. If we don't have this 
> check users can (by mistake) modify broadcast state. This would lead to 
> incorrect results which are very hard to notice, much less debug.
> With the way the Flink state API works (users can get a {{State}} in 
> {{open()}} and work with state by calling methods on that) we have to add 
> special wrapping state classes that only allow modification of state when 
> processing a broadcast element.
> For the API, I propose to add a new interface `InternalStateAccessor`:
> {code}
> /**
>  * Interface for accessing persistent state.
>  */
> @PublicEvolving
> public interface InternalStateAccessor {
>    S state(
>   N namespace,
>   TypeSerializer namespaceSerializer,
>   StateDescriptor stateDescriptor)}
> {code}
> this is the same as `KeyedStateBackend.getPartitionedState()` but allows us 
> to abstract away the special nature of broadcast state. This is also meant as 
> an external interface and is not to be exposed to user functions. Only 
> operators should deal with this.
> {{AbstractStreamOperator}} would get a new method 
> `getBroadcastStateAccessor()` that returns an implementation of this 
> interface. The implementation would have a {{KeyedStateBackend}} but wrap the 
> state in special wrappers that only allow modification when processing 
> broadcast elements (as mentioned above). 
> On the lower implementation levels, we have to add a new entry for our state 
> to `OperatorSnapshotResult`. For example:
> {code}
> private RunnableFuture broadcastStateManagedFuture;
> {code}
> Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation 
> logic will have to be adapted to support this new kind of state. With the 
> ongoing changes in supporting incremental snapshotting and other new features 
> for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or 
> [~stefanrichte...@gmail.com] and/or [~xiaogang.shi]. We also have to be very 
> careful about maintaining compatibility with savepoints from older versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8033) Build Flink with JDK 9

2018-01-16 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327153#comment-16327153
 ] 

Stephan Ewen commented on FLINK-8033:
-

IIRC, we need to bump Netty to a newer version, because our current Netty 
version is not Java 9 compatible.

> Build Flink with JDK 9
> --
>
> Key: FLINK-8033
> URL: https://issues.apache.org/jira/browse/FLINK-8033
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.5.0
>
>
> This is a JIRA to track all issues that found to make Flink compatible with 
> Java 9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8438) Ensure that implicit Scala TypeInformation works in Eclipse IDE

2018-01-16 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8438:
---

 Summary: Ensure that implicit Scala TypeInformation works in 
Eclipse IDE
 Key: FLINK-8438
 URL: https://issues.apache.org/jira/browse/FLINK-8438
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Timo Walther
Assignee: Timo Walther


It seems that Scala macros do not work properly in the Eclipse IDE. We need to 
investigate this further and maybe need to update the Scala macros.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327130#comment-16327130
 ] 

ASF GitHub Bot commented on FLINK-8290:
---

GitHub user maqingxiang opened a pull request:

https://github.com/apache/flink/pull/5304

[FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

Now the Clientid that consumes the all topics are 
constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy for 
us to look at kafka's log, so I recommend that it be modified to groupid.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maqingxiang/flink FLINK-8290

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5304.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5304


commit e8c19520e9b7500548ad36afb8bd698a35d64053
Author: maqingxiang-it 
Date:   2018-01-16T13:42:45Z

Modify clientId to groupId in flink-connector-kafka-0.8




> Modify clientId to groupId in flink-connector-kafka-0.8
> ---
>
> Key: FLINK-8290
> URL: https://issues.apache.org/jira/browse/FLINK-8290
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Priority: Major
>
> Now the Clientid that consumes the all topics are 
> constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy 
> for us to look at kafka's log, so I recommend that it be modified to groupid.
> We can modify the SimpleConsumerThread.java file, as shown below:
> {code:java}
> private final String clientId;
> ...
> this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" 
> + broker.id());
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5304: [FLINK-8290]Modify clientId to groupId in flink-co...

2018-01-16 Thread maqingxiang
GitHub user maqingxiang opened a pull request:

https://github.com/apache/flink/pull/5304

[FLINK-8290]Modify clientId to groupId in flink-connector-kafka-0.8

Now the Clientid that consumes the all topics are 
constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy for 
us to look at kafka's log, so I recommend that it be modified to groupid.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maqingxiang/flink FLINK-8290

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5304.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5304


commit e8c19520e9b7500548ad36afb8bd698a35d64053
Author: maqingxiang-it 
Date:   2018-01-16T13:42:45Z

Modify clientId to groupId in flink-connector-kafka-0.8




---


[jira] [Comment Edited] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8

2018-01-16 Thread xymaqingxiang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327118#comment-16327118
 ] 

xymaqingxiang edited comment on FLINK-8290 at 1/16/18 1:33 PM:
---

The description of “client.id” is “An id string to pass to the server when 
making requests. The purpose of this is to be able to track the source of 
requests beyond just ip/port by allowing a logical application name to be 
included in server-side request logging.” in documentation of 
[kafka](https://kafka.apache.org/documentation/#consumerconfigs).


was (Author: xymaqingxiang):
The description of “client.id” is “An id string to pass to the server when 
making requests. The purpose of this is to be able to track the source of 
requests beyond just ip/port by allowing a logical application name to be 
included in server-side request logging.” in documentation of 
[kafka](https://kafka.apache.org/documentation/#consumerconfigs).

> Modify clientId to groupId in flink-connector-kafka-0.8
> ---
>
> Key: FLINK-8290
> URL: https://issues.apache.org/jira/browse/FLINK-8290
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Priority: Major
>
> Now the Clientid that consumes the all topics are 
> constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy 
> for us to look at kafka's log, so I recommend that it be modified to groupid.
> We can modify the SimpleConsumerThread.java file, as shown below:
> {code:java}
> private final String clientId;
> ...
> this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" 
> + broker.id());
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8

2018-01-16 Thread xymaqingxiang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327118#comment-16327118
 ] 

xymaqingxiang commented on FLINK-8290:
--

The description of “client.id” is “An id string to pass to the server when 
making requests. The purpose of this is to be able to track the source of 
requests beyond just ip/port by allowing a logical application name to be 
included in server-side request logging.” in documentation of 
[kafka](https://kafka.apache.org/documentation/#consumerconfigs).

> Modify clientId to groupId in flink-connector-kafka-0.8
> ---
>
> Key: FLINK-8290
> URL: https://issues.apache.org/jira/browse/FLINK-8290
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Priority: Major
>
> Now the Clientid that consumes the all topics are 
> constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy 
> for us to look at kafka's log, so I recommend that it be modified to groupid.
> We can modify the SimpleConsumerThread.java file, as shown below:
> {code:java}
> private final String clientId;
> ...
> this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" 
> + broker.id());
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8

2018-01-16 Thread xymaqingxiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xymaqingxiang updated FLINK-8290:
-
Description: 
Now the Clientid that consumes the all topics are 
constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy for 
us to look at kafka's log, so I recommend that it be modified to groupid.

We can modify the SimpleConsumerThread.java file, as shown below:
{code:java}
private final String clientId;
...
this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + 
broker.id());
...
{code}

  was:
Now the Clientid that consumes the all topics are 
constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy for 
us to look at kafka's log, so I recommend that it be modified to groupid.

We can modify the SimpleConsumerThread.java file, as shown below:

{code:java}
private final String clientIdFormGroup;
...
this.clientIdFormGroup = config.getProperty("group.id", 
"flink-kafka-consumer-legacy-" + broker.id());
...
final String clientId = clientIdFormGroup;
{code}



> Modify clientId to groupId in flink-connector-kafka-0.8
> ---
>
> Key: FLINK-8290
> URL: https://issues.apache.org/jira/browse/FLINK-8290
> Project: Flink
>  Issue Type: Improvement
>Reporter: xymaqingxiang
>Priority: Major
>
> Now the Clientid that consumes the all topics are 
> constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy 
> for us to look at kafka's log, so I recommend that it be modified to groupid.
> We can modify the SimpleConsumerThread.java file, as shown below:
> {code:java}
> private final String clientId;
> ...
> this.clientId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" 
> + broker.id());
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8437) SideOutput() API is ambiguous

2018-01-16 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8437:

Description: 
The current API for retrieving Side Outputs is a bit ambiguous. Consider the 
program below:
{code:java}
sideOutput = stream
    .process(...)
    .filter(...)
    .getSideOutput(tag)
{code}

This may be the sideOutput of the process function that is passed through the 
API for convenience, or the sideOutput of the filter function (which would 
always be empty).

Given that only process functions can have sideOutputs we may want to change 
the return type so that getSideOutput can only be called after a process 
function.


  was:
The current API for retrieving Side Outputs is a bit ambiguous. Consider the 
program below:
{code:java}
sideOutput = stream
    .process(...)
    .filter(...)
    .getSideOutput(tag)
{code}

This may be the sideOutput of the process function that is passed through the 
API for convenience, or the sideOutput of the filter function (which would 
always be empty).



> SideOutput() API is ambiguous
> -
>
> Key: FLINK-8437
> URL: https://issues.apache.org/jira/browse/FLINK-8437
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Minor
>
> The current API for retrieving Side Outputs is a bit ambiguous. Consider the 
> program below:
> {code:java}
> sideOutput = stream
>     .process(...)
>     .filter(...)
>     .getSideOutput(tag)
> {code}
> This may be the sideOutput of the process function that is passed through the 
> API for convenience, or the sideOutput of the filter function (which would 
> always be empty).
> Given that only process functions can have sideOutputs we may want to change 
> the return type so that getSideOutput can only be called after a process 
> function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8437) SideOutput() API is ambiguous

2018-01-16 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8437:

Priority: Minor  (was: Major)

> SideOutput() API is ambiguous
> -
>
> Key: FLINK-8437
> URL: https://issues.apache.org/jira/browse/FLINK-8437
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Minor
>
> The current API for retrieving Side Outputs is a bit ambiguous. Consider the 
> program below:
> {code:java}
> sideOutput = stream
>     .process(...)
>     .filter(...)
>     .getSideOutput(tag)
> {code}
> This may be the sideOutput of the process function that is passed through the 
> API for convenience, or the sideOutput of the filter function (which would 
> always be empty).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8437) SideOutput() API is ambiguous

2018-01-16 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8437:

Issue Type: Improvement  (was: Bug)

> SideOutput() API is ambiguous
> -
>
> Key: FLINK-8437
> URL: https://issues.apache.org/jira/browse/FLINK-8437
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The current API for retrieving Side Outputs is a bit ambiguous. Consider the 
> program below:
> {code:java}
> sideOutput = stream
>     .process(...)
>     .filter(...)
>     .getSideOutput(tag)
> {code}
> This may be the sideOutput of the process function that is passed through the 
> API for convenience, or the sideOutput of the filter function (which would 
> always be empty).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8437) SideOutput() API is ambiguous

2018-01-16 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-8437:
---

 Summary: SideOutput() API is ambiguous
 Key: FLINK-8437
 URL: https://issues.apache.org/jira/browse/FLINK-8437
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.0
Reporter: Chesnay Schepler


The current API for retrieving Side Outputs is a bit ambiguous. Consider the 
program below:
{code:java}
sideOutput = stream
    .process(...)
    .filter(...)
    .getSideOutput(tag)
{code}

This may be the sideOutput of the process function that is passed through the 
API for convenience, or the sideOutput of the filter function (which would 
always be empty).




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5303: [hotfix][docs] Adjust RocksDb dependency docs

2018-01-16 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5303

[hotfix][docs] Adjust RocksDb dependency docs

Small adjustment as per 
https://github.com/apache/flink/commit/5623ac66bd145d52f3488ac2fff9dbc762d0bda1#commitcomment-26867793.

@StephanEwen @rmetzger 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink hotfix_rocks_docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5303.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5303


commit 2a2aaced539073d823d5ab87027cd96cdac09ad3
Author: zentol 
Date:   2018-01-16T12:32:36Z

[hotfix][docs] Adjust RocksDb dependency docs




---


[jira] [Commented] (FLINK-7777) Bump japicmp to 0.11.0

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327072#comment-16327072
 ] 

ASF GitHub Bot commented on FLINK-:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5302


> Bump japicmp to 0.11.0
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Minor
> Fix For: 1.5.0
>
>
> Currently, flink used japicmp-maven-plugin version is 0.7.0, I'm getting 
> these warnings from the maven plugin during a *mvn clean verify*:
> {code:java}
> [INFO] Written file '.../target/japicmp/japicmp.diff'.
> [INFO] Written file '.../target/japicmp/japicmp.xml'.
> [INFO] Written file '.../target/japicmp/japicmp.html'.
> Warning:  org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> Compiler warnings:
>   WARNING:  'org.apache.xerces.jaxp.SAXParserImpl: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.'
> Warning:  org.apache.xerces.parsers.SAXParser: Feature 
> 'http://javax.xml.XMLConstants/feature/secure-processing' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> {code}
> japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting 
> dependency in order to prevent warnings from SAXParserImpl. _
> The current stable version is 0.11.0, we can consider upgrading to this 
> version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-7777) Bump japicmp to 0.11.0

2018-01-16 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-.
---
Resolution: Fixed

master: 988bdde17fec8896aae4bc041a10d5e30a4cb702

> Bump japicmp to 0.11.0
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Minor
> Fix For: 1.5.0
>
>
> Currently, flink used japicmp-maven-plugin version is 0.7.0, I'm getting 
> these warnings from the maven plugin during a *mvn clean verify*:
> {code:java}
> [INFO] Written file '.../target/japicmp/japicmp.diff'.
> [INFO] Written file '.../target/japicmp/japicmp.xml'.
> [INFO] Written file '.../target/japicmp/japicmp.html'.
> Warning:  org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> Compiler warnings:
>   WARNING:  'org.apache.xerces.jaxp.SAXParserImpl: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.'
> Warning:  org.apache.xerces.parsers.SAXParser: Feature 
> 'http://javax.xml.XMLConstants/feature/secure-processing' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> {code}
> japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting 
> dependency in order to prevent warnings from SAXParserImpl. _
> The current stable version is 0.11.0, we can consider upgrading to this 
> version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5302: [FLINK-7777][build] Bump japicmp to 0.11.0

2018-01-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5302


---


[jira] [Commented] (FLINK-7777) Bump japicmp to 0.11.0

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327066#comment-16327066
 ] 

ASF GitHub Bot commented on FLINK-:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5302
  
merging.


> Bump japicmp to 0.11.0
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Minor
> Fix For: 1.5.0
>
>
> Currently, flink used japicmp-maven-plugin version is 0.7.0, I'm getting 
> these warnings from the maven plugin during a *mvn clean verify*:
> {code:java}
> [INFO] Written file '.../target/japicmp/japicmp.diff'.
> [INFO] Written file '.../target/japicmp/japicmp.xml'.
> [INFO] Written file '.../target/japicmp/japicmp.html'.
> Warning:  org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> Compiler warnings:
>   WARNING:  'org.apache.xerces.jaxp.SAXParserImpl: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.'
> Warning:  org.apache.xerces.parsers.SAXParser: Feature 
> 'http://javax.xml.XMLConstants/feature/secure-processing' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://javax.xml.XMLConstants/property/accessExternalDTD' is not recognized.
> Warning:  org.apache.xerces.parsers.SAXParser: Property 
> 'http://www.oracle.com/xml/jaxp/properties/entityExpansionLimit' is not 
> recognized.
> {code}
> japicmp fixed in version 0.7.1 : _Excluded xerces vom maven-reporting 
> dependency in order to prevent warnings from SAXParserImpl. _
> The current stable version is 0.11.0, we can consider upgrading to this 
> version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5302: [FLINK-7777][build] Bump japicmp to 0.11.0

2018-01-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5302
  
merging.


---


[jira] [Closed] (FLINK-8414) Gelly performance seriously decreases when using the suggested parallelism configuration

2018-01-16 Thread flora karniav (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

flora karniav closed FLINK-8414.

Resolution: Feedback Received

> Gelly performance seriously decreases when using the suggested parallelism 
> configuration
> 
>
> Key: FLINK-8414
> URL: https://issues.apache.org/jira/browse/FLINK-8414
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration, Documentation, Gelly
>Reporter: flora karniav
>Priority: Minor
>
> I am running Gelly examples with different datasets in a cluster of 5 
> machines (1 Jobmanager and 4 Taskmanagers) of 32 cores each.
> The number of Slots parameter is set to 32 (as suggested) and the parallelism 
> to 128 (32 cores*4 taskmanagers).
> I observe a vast performance degradation using these suggested settings than 
> setting parallelism.default to 16 for example were the same job completes at 
> ~60 seconds vs ~140 in the 128 parallelism case.
> Is there something wrong in my configuration? Should I decrease parallelism 
> and -if so- will this inevitably decrease CPU utilization?
> Another matter that may be related to this is the number of partitions of the 
> data. Is this somehow related to parallelism? How many partitions are created 
> in the case of parallelism.default=128? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8187) Web client does not print errors

2018-01-16 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8187:

Description: 
When submitting a jar with no defined Main class, the web client does not 
respond anymore instead of printing the REST error:
{code:java}
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: 
Could not run the jar.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Neither 
a 'Main-Class', nor a 'program-class' entry was found in the jar file.
at 
org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:592)
at 
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:188)
at 
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:147)
at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:72)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
{code}

  was:
When submitting a jar with no defined Main class, the web client does not 
respond anymore and instead of printing the REST error:

{code}
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: 
Could not run the jar.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Neither 
a 'Main-Class', nor a 'program-class' entry was found in the jar file.
at 
org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:592)
at 
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:188)
at 
org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:147)
at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:72)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
{code} 


> Web client does not print errors
> 
>
> Key: FLINK-8187
> URL: https://issues.apache.org/jira/browse/FLINK-8187
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Timo Walther
>Priority: Major
>
> When submitting a jar with no defined Main class, the web client does not 
> respond anymore instead of printing the REST error:
> {code:java}
> java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkException: Could not run the jar.
>   at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at 

[GitHub] flink pull request #:

2018-01-16 Thread zentol
Github user zentol commented on the pull request:


https://github.com/apache/flink/commit/907361d862c77a70ff60d27e7fcc13647eac0e6d#commitcomment-26885301
  
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java:
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
 on line 34:
What benefit is there in referencing a module that doesn't exist?


---


[jira] [Closed] (FLINK-8050) RestServer#shutdown() ignores exceptions thrown when shutting down netty.

2018-01-16 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8050.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Merged with 642e11a9cd31c83fbbabe860871c714f3d4ca981

> RestServer#shutdown() ignores exceptions thrown when shutting down netty.
> -
>
> Key: FLINK-8050
> URL: https://issues.apache.org/jira/browse/FLINK-8050
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8049) RestClient#shutdown() ignores exceptions thrown when shutting down netty.

2018-01-16 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8049.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Merged with 7da32d19f9623ca98c8a4ba76e7c406bf9318d4d

> RestClient#shutdown() ignores exceptions thrown when shutting down netty.
> -
>
> Key: FLINK-8049
> URL: https://issues.apache.org/jira/browse/FLINK-8049
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8049) RestClient#shutdown() ignores exceptions thrown when shutting down netty.

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326974#comment-16326974
 ] 

ASF GitHub Bot commented on FLINK-8049:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5057


> RestClient#shutdown() ignores exceptions thrown when shutting down netty.
> -
>
> Key: FLINK-8049
> URL: https://issues.apache.org/jira/browse/FLINK-8049
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Critical
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5057: [FLINK-8049] [FLINK-8050] REST client/server repor...

2018-01-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5057


---


[jira] [Updated] (FLINK-8436) Allow implementing both RetractTableSink and BatchTableSink in one class

2018-01-16 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8436:

Description: When implementing a table sink that should have the 
functionality of both a {{RetractStreamTableSink}} and {{BatchTableSink}} the 
current interface design has clashes in {{TableSink.getOutputType}}. We should 
clean/rework the interfaces there. Additionally, the return type should be 
{{TypeInformation>}} instead of 
{{TupleTypeInfo>}} to be more generic.  (was: When 
implementing a table sink that should have the functionality of both a 
{{RetractStreamTableSink}} and {{BatchTableSink}} the current interface design 
has clashes in {{TableSink.getOutputType}}. We should clean/rework the 
interfaces there.)

> Allow implementing both RetractTableSink and BatchTableSink in one class
> 
>
> Key: FLINK-8436
> URL: https://issues.apache.org/jira/browse/FLINK-8436
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> When implementing a table sink that should have the functionality of both a 
> {{RetractStreamTableSink}} and {{BatchTableSink}} the current interface 
> design has clashes in {{TableSink.getOutputType}}. We should clean/rework the 
> interfaces there. Additionally, the return type should be 
> {{TypeInformation>}} instead of 
> {{TupleTypeInfo>}} to be more generic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8436) Allow implementing both RetractTableSink and BatchTableSink in one class

2018-01-16 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326963#comment-16326963
 ] 

Fabian Hueske commented on FLINK-8436:
--

+1

> Allow implementing both RetractTableSink and BatchTableSink in one class
> 
>
> Key: FLINK-8436
> URL: https://issues.apache.org/jira/browse/FLINK-8436
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> When implementing a table sink that should have the functionality of both a 
> {{RetractStreamTableSink}} and {{BatchTableSink}} the current interface 
> design has clashes in {{TableSink.getOutputType}}. We should clean/rework the 
> interfaces there. Additionally, the return type should be 
> {{TypeInformation>}} instead of 
> {{TupleTypeInfo>}} to be more generic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8436) Allow implementing both RetractTableSink and BatchTableSink in one class

2018-01-16 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8436:
---

 Summary: Allow implementing both RetractTableSink and 
BatchTableSink in one class
 Key: FLINK-8436
 URL: https://issues.apache.org/jira/browse/FLINK-8436
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


When implementing a table sink that should have the functionality of both a 
{{RetractStreamTableSink}} and {{BatchTableSink}} the current interface design 
has clashes in {{TableSink.getOutputType}}. We should clean/rework the 
interfaces there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326911#comment-16326911
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r161694419
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -167,24 +167,48 @@ public void update(List values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = 
keySerializationStream.toByteArray();
-   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
 
-   List bytes = new 
ArrayList<>(values.size());
-   for (V value : values) {
-   keySerializationStream.reset();
-   valueSerializer.serialize(value, out);
-   
bytes.add(keySerializationStream.toByteArray());
+   byte[] premerge = getPreMergedValue(values);
+   if (premerge != null) {
+   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   } else {
+   throw new IOException("Failed pre-merge 
values in update()");
}
+   } catch (IOException | RocksDBException e) {
+   throw new RuntimeException("Error while 
updating data to RocksDB", e);
+   }
+   }
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   if (values != null && !values.isEmpty()) {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = 
keySerializationStream.toByteArray();
 
-   byte[] premerge = MergeUtils.merge(bytes);
+   byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
-   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
} else {
-   throw new IOException("Failed pre-merge 
values");
+   throw new IOException("Failed pre-merge 
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while 
updating data to RocksDB", e);
}
}
}
+
+   private byte[] getPreMergedValue(List values) throws IOException {
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+
+   List bytes = new ArrayList<>(values.size());
--- End diff --

Why not just serialize all objects in the stream and write the RocksDB 
separator byte between all object bytes? This could improve performance (less 
temporary copies and objects) and might be equally or more readable?

From a performance point of view, even `#toByteArray()` results in an 
unnecessary copy - we could use the internal array, offset + len for our insert 
to RocksDB.

What do you think?


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5281: [FLINK-7938] [State Backend] support addAll() in L...

2018-01-16 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r161694419
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
@@ -167,24 +167,48 @@ public void update(List values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = 
keySerializationStream.toByteArray();
-   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
 
-   List bytes = new 
ArrayList<>(values.size());
-   for (V value : values) {
-   keySerializationStream.reset();
-   valueSerializer.serialize(value, out);
-   
bytes.add(keySerializationStream.toByteArray());
+   byte[] premerge = getPreMergedValue(values);
+   if (premerge != null) {
+   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   } else {
+   throw new IOException("Failed pre-merge 
values in update()");
}
+   } catch (IOException | RocksDBException e) {
+   throw new RuntimeException("Error while 
updating data to RocksDB", e);
+   }
+   }
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   if (values != null && !values.isEmpty()) {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = 
keySerializationStream.toByteArray();
 
-   byte[] premerge = MergeUtils.merge(bytes);
+   byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
-   backend.db.put(columnFamily, 
writeOptions, key, premerge);
+   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
} else {
-   throw new IOException("Failed pre-merge 
values");
+   throw new IOException("Failed pre-merge 
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while 
updating data to RocksDB", e);
}
}
}
+
+   private byte[] getPreMergedValue(List values) throws IOException {
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+
+   List bytes = new ArrayList<>(values.size());
--- End diff --

Why not just serialize all objects in the stream and write the RocksDB 
separator byte between all object bytes? This could improve performance (less 
temporary copies and objects) and might be equally or more readable?

From a performance point of view, even `#toByteArray()` results in an 
unnecessary copy - we could use the internal array, offset + len for our insert 
to RocksDB.

What do you think?


---


[jira] [Commented] (FLINK-7938) support addAll() in ListState

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326905#comment-16326905
 ] 

ASF GitHub Bot commented on FLINK-7938:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5281#discussion_r161693211
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
@@ -756,6 +756,11 @@ public boolean isClearCalled() {
public void update(List values) throws Exception {
clear();
 
+   addAll(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
if (values != null && !values.isEmpty()) {
--- End diff --

I think the check for `!values.isEmpty()` is not required and just 
boilerplate - the following line also works for empty lists.


> support addAll() in ListState
> -
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >