[GitHub] flink pull request #2622: [FLINK-3706] Fix YARN test instability

2016-10-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2622


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3706) YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568268#comment-15568268
 ] 

ASF GitHub Bot commented on FLINK-3706:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2622


> YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable
> 
>
> Key: FLINK-3706
> URL: https://issues.apache.org/jira/browse/FLINK-3706
> Project: Flink
>  Issue Type: Bug
>Reporter: Aljoscha Krettek
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Attachments: log.txt
>
>
> I encountered a failed test on travis.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4802) distinct() implicitly uses 0th field, when called without a parameter

2016-10-12 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-4802:
---

Assignee: Chesnay Schepler

> distinct() implicitly uses 0th field, when called without a parameter
> -
>
> Key: FLINK-4802
> URL: https://issues.apache.org/jira/browse/FLINK-4802
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Reporter: Yakov Goldberg
>Assignee: Chesnay Schepler
>
> Check this code in DataSet.py
> {code}
> def distinct(self, *fields): 
> f = None 
> if len(fields) == 0: 
> f = lambda x: (x,) 
> fields = (0,) 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4805) Stringify() crashes with Python3 (run with pyflink3)

2016-10-12 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-4805:
---

Assignee: Chesnay Schepler

> Stringify() crashes with Python3 (run with pyflink3)
> 
>
> Key: FLINK-4805
> URL: https://issues.apache.org/jira/browse/FLINK-4805
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Reporter: Yakov Goldberg
>Assignee: Chesnay Schepler
>
> {code}
> Caused by: java.lang.RuntimeException: External process for task MapPartition 
> (PythonMap) terminated prematurely due to an error.
> Traceback (most recent call last):
>   File 
> "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/plan.py",
>  line 548, in 
> env.execute(local=True)
>   File 
> "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/plan/Environment.py",
>  line 181, in execute
> operator._go()
>   File 
> "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/functions/Function.py",
>  line 64, in _go
> self._run()
>   File 
> "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/functions/MapFunction.py",
>  line 29, in _run
> collector.collect(function(value))
>   File 
> "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/plan/DataSet.py",
>  line 38, in map
> return "(" + b", ".join([self.map(x) for x in value]) + ")"
>   File 
> "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/plan/DataSet.py",
>  line 38, in 
> return "(" + b", ".join([self.map(x) for x in value]) + ")"
>   File 
> "/tmp/flink-dist-cache-299804c6-813a-44de-9f62-c5f4cf415990/1527bd1cc45d6f67695c180762c614ef/flink/flink/plan/DataSet.py",
>  line 38, in map
> return "(" + b", ".join([self.map(x) for x in value]) + ")"
> TypeError: sequence item 0: expected bytes, bytearray, or an object with the 
> buffer interface, str found
>   at 
> org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:268)
>   at 
> org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
>   at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:98)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4799) Re-add build-target symlink to project root

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568290#comment-15568290
 ] 

ASF GitHub Bot commented on FLINK-4799:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2620


> Re-add build-target symlink to project root
> ---
>
> Key: FLINK-4799
> URL: https://issues.apache.org/jira/browse/FLINK-4799
> Project: Flink
>  Issue Type: Wish
>  Components: Build System
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 1.2.0
>
>
> We have previously removed the plugin which created the 'build-target' link 
> to the build target directory. See FLINK-4732. At least one user has 
> requested to re-add the link.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4794) partition_by_hash() crashes if no parameter is provided

2016-10-12 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-4794:
---

Assignee: Chesnay Schepler

> partition_by_hash() crashes if no parameter is provided
> ---
>
> Key: FLINK-4794
> URL: https://issues.apache.org/jira/browse/FLINK-4794
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Reporter: Yakov Goldberg
>Assignee: Chesnay Schepler
>
> partition_by_hash() crashes if no parameter is provided.
> Looks like a line of code was missed, check distinct()
> {code}
> def distinct(self, *fields): 
> f = None 
> if len(fields) == 0: 
> f = lambda x: (x,) 
> fields = (0,) 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4795) CsvStringify crashes in case of tuple in tuple, t.e. ("a", True, (1,5))

2016-10-12 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-4795:
---

Assignee: Chesnay Schepler

> CsvStringify crashes in case of tuple in tuple, t.e. ("a", True, (1,5))
> ---
>
> Key: FLINK-4795
> URL: https://issues.apache.org/jira/browse/FLINK-4795
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Reporter: Yakov Goldberg
>Assignee: Chesnay Schepler
>
> CsvStringify crashes in case of tuple in tuple, t.e. ("a", True, (1,5))
> Looks like, mistyping in CsvStringify._map()
> {code}
> def _map(self, value): 
> if isinstance(value, (tuple, list)): 
> return "(" + b", ".join([self.map(x) for x in value]) + ")" 
> else: 
> return str(value) 
> {code}
> self._map() should be called
> But this will affect write_csv() and read_csv().
> write_csv() will work automatically
> and read_csv() should be implemented to be able to read Tuple type.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568348#comment-15568348
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82978898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ---
@@ -256,29 +306,51 @@ public boolean acknowledgeTask(
 * Aborts a checkpoint because it expired (took too long).
 */
public void abortExpired() throws Exception {
--- End diff --

I would like to do this as a follow up


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82978898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ---
@@ -256,29 +306,51 @@ public boolean acknowledgeTask(
 * Aborts a checkpoint because it expired (took too long).
 */
public void abortExpired() throws Exception {
--- End diff --

I would like to do this as a follow up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4793) Using a local method with :: notation in Java 8 causes index out of bounds

2016-10-12 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4793.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 1dda3ad009667697a620359e997e83a5ba2447dd.

> Using a local method with :: notation in Java 8 causes index out of bounds
> --
>
> Key: FLINK-4793
> URL: https://issues.apache.org/jira/browse/FLINK-4793
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Dunning
>Assignee: Timo Walther
> Fix For: 1.2.0
>
>
> I tried to use the toString method on an object as a map function:
> {code}
> .map(Trade::toString)
> {code}
> This caused an index out of bounds error:
> {code}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:351)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:305)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:120)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:506)
>   at 
> com.mapr.aggregate.AggregateTest.testAggregateTrades(AggregateTest.java:81)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
> {code}
> On the other hand, if I use a public static method, like this:
> {code}
> .map(Trade::fromString)
> {code}
> All is good. fromString and toString are defined like this:
> {code}
> public static Trade fromString(String s) throws IOException {
> return mapper.readValue(s, Trade.class);
> }
> @Override
> public String toString() {
> return String.format("{\"%s\", %d, %d, %.2f}", symbol, time, volume, 
> price);
> }
> {code}
> This might be a viable restriction on what functions I can use, but there 
> certainly should be a better error message, if so.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2599: [FLINK-4746] Make TaskManagerRuntimeInfo an interf...

2016-10-12 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2599


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4492) Cleanup files from canceled snapshots

2016-10-12 Thread Nikolay Vasilishin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568220#comment-15568220
 ] 

Nikolay Vasilishin commented on FLINK-4492:
---

This issue has been resolved since [FLINK-3761] (commit 
4809f5367b08a9734fc1bd4875be51a9f3bb65aa). 
Now canceling job causes cleaning up checkpoint directory (via 
FsCheckpointStateOutputStream#close() method).

> Cleanup files from canceled snapshots
> -
>
> Key: FLINK-4492
> URL: https://issues.apache.org/jira/browse/FLINK-4492
> Project: Flink
>  Issue Type: Bug
>Reporter: Stefan Richter
>Assignee: Nikolay Vasilishin
>Priority: Minor
>
> Current checkpointing only closes CheckpointStateOutputStreams on cancel, but 
> incomplete files are not properly deleted from the filesystem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82970716
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -219,33 +245,9 @@ public CheckpointCoordinator(
 * Shuts down the checkpoint coordinator.
 *
 * After this method has been called, the coordinator does not accept
-* and further messages and cannot trigger any further checkpoints. All
-* checkpoint state is discarded.
-*/
-   public void shutdown() throws Exception {
-   shutdown(true);
-   }
-
-   /**
-* Suspends the checkpoint coordinator.
-*
-* After this method has been called, the coordinator does not accept
 * and further messages and cannot trigger any further checkpoints.
-*
-* The difference to shutdown is that checkpoint state in the store
-* and counter is kept around if possible to recover later.
-*/
-   public void suspend() throws Exception {
-   shutdown(false);
-   }
-
-   /**
-* Shuts down the checkpoint coordinator.
-*
-* @param shutdownStoreAndCounter Depending on this flag the checkpoint
-* state services are shut down or suspended.
 */
-   private void shutdown(boolean shutdownStoreAndCounter) throws Exception 
{
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

True, but I thought we kept it `shutdown` for consistency reasons.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4812) Export currentLowWatermark metric also for sources

2016-10-12 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4812:
-

 Summary: Export currentLowWatermark metric also for sources
 Key: FLINK-4812
 URL: https://issues.apache.org/jira/browse/FLINK-4812
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Reporter: Robert Metzger


As reported by a user, Flink does currently not export the current low 
watermark for sources 
(http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).

This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82972407
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 ---
@@ -64,9 +62,9 @@ public void recover() throws Exception {
 
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-   checkpoints.addLast(checkpoint);
+   checkpoints.add(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-   checkpoints.removeFirst().discardState();
+   checkpoints.remove().subsume();
--- End diff --

Manually triggered savepoints for example are not discarded when they are 
subsumed. The CheckpointProperties constructor is package private (for testing) 
and only the static creator methods (for persistent checkpoints, regular 
checkpoints, and manually triggered savepoints) are publicly accessible. Let me 
add a check to the properties that only allow manual discard if the checkpoint 
is persisted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568316#comment-15568316
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976634
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -233,68 +229,90 @@ public int getNumberOfRetainedCheckpoints() {
}
 
@Override
-   public void shutdown() throws Exception {
-   LOG.info("Shutting down");
+   public void shutdown(JobStatus jobStatus) throws Exception {
--- End diff --

Our only option would be to wrap in our own Exception, because Curator is 
throwing the general `Exception`.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 ---
@@ -48,13 +48,12 @@
public static CompletedCheckpoint loadAndValidateSavepoint(
JobID jobId,
Map tasks,
-   SavepointStore savepointStore,
String savepointPath) throws Exception {
 
// (1) load the savepoint
-   Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
+   Savepoint savepoint = 
SavepointStore.loadSavepoint(savepointPath);
final Map taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
-   
+
// (2) validate it (parallelism, etc)
for (TaskState taskState : savepoint.getTaskStates()) {
ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
--- End diff --

Yes, updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568319#comment-15568319
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 ---
@@ -48,13 +48,12 @@
public static CompletedCheckpoint loadAndValidateSavepoint(
JobID jobId,
Map tasks,
-   SavepointStore savepointStore,
String savepointPath) throws Exception {
 
// (1) load the savepoint
-   Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
+   Savepoint savepoint = 
SavepointStore.loadSavepoint(savepointPath);
final Map taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
-   
+
// (2) validate it (parallelism, etc)
for (TaskState taskState : savepoint.getTaskStates()) {
ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
--- End diff --

Yes, updated.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4717) Naive version of atomic stop signal with savepoint

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568191#comment-15568191
 ] 

ASF GitHub Bot commented on FLINK-4717:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2609
  
Thanks for this review, too. Good catch with the broken 
`stopCheckpointScheduler`. I will add your proposed fix as a separate commit.


> Naive version of atomic stop signal with savepoint
> --
>
> Key: FLINK-4717
> URL: https://issues.apache.org/jira/browse/FLINK-4717
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0
>
>
> As a first step towards atomic stopping with savepoints we should implement a 
> cancel command which prior to cancelling takes a savepoint. Additionally, it 
> should turn off the periodic checkpointing so that there won't be checkpoints 
> executed between the savepoint and the cancel command.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82969876
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -637,20 +637,25 @@ protected int savepoint(String[] args) {
"Specify a Job 
ID to trigger a savepoint."));
}
 
-   return triggerSavepoint(options, jobId);
+   String savepointDirectory = null;
+   if (cleanedArgs.length == 2) {
--- End diff --

Changed the check to `>= 2` and printed a message that some arguments are 
unneded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568207#comment-15568207
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82969876
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -637,20 +637,25 @@ protected int savepoint(String[] args) {
"Specify a Job 
ID to trigger a savepoint."));
}
 
-   return triggerSavepoint(options, jobId);
+   String savepointDirectory = null;
+   if (cleanedArgs.length == 2) {
--- End diff --

Changed the check to `>= 2` and printed a message that some arguments are 
unneded.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2608: [FLINK-4512] [FLIP-10] Add option to persist perio...

2016-10-12 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976383
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -172,7 +168,7 @@ public void recover() throws Exception {
 
for (int i = 0; i < numberOfInitialCheckpoints - 1; 
i++) {
try {
-   
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+   
removeSubsumed(initialCheckpoints.get(i));
--- End diff --

Yes. Even more, I think this is generally dangerous. What if a checkpoint 
is recovered, but the checkpoint cannot be restored, than we will have lost all 
others. Since we currently only keep a single one anyways, it is not a problem 
yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4804) Grouping.first() function usage fails

2016-10-12 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-4804:
---

Assignee: Chesnay Schepler

> Grouping.first() function usage fails
> -
>
> Key: FLINK-4804
> URL: https://issues.apache.org/jira/browse/FLINK-4804
> Project: Flink
>  Issue Type: Bug
>Reporter: Yakov Goldberg
>Assignee: Chesnay Schepler
>
> Trying to use Grouping.first()  in following example:
> {code}
> dd2 = env.from_elements((1, "data"), (1, "hello"), (1, "z")) 
> dd2 \
>  .group_by(0) \   
>  .sort_group(1, Order.ASCENDING) \
>  .first(2) \
>  .reduce_group(PlainReduce(), combinable=True)
> {code} 
> 1. Is this example correct?
> 2. If so, got the following error:
> {code}
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>   at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.api.java.operators.SortedGrouping cannot be cast to 
> org.apache.flink.api.java.DataSet
>   at 
> org.apache.flink.python.api.PythonPlanBinder.createFirstOperation(PythonPlanBinder.java:470)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:325)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:236)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:140)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:113)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568313#comment-15568313
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2608#discussion_r82976383
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
@@ -172,7 +168,7 @@ public void recover() throws Exception {
 
for (int i = 0; i < numberOfInitialCheckpoints - 1; 
i++) {
try {
-   
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+   
removeSubsumed(initialCheckpoints.get(i));
--- End diff --

Yes. Even more, I think this is generally dangerous. What if a checkpoint 
is recovered, but the checkpoint cannot be restored, than we will have lost all 
others. Since we currently only keep a single one anyways, it is not a problem 
yet.


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4283) ExecutionGraphRestartTest fails

2016-10-12 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4283:
-
Assignee: Alexander Shoshin

> ExecutionGraphRestartTest fails
> ---
>
> Key: FLINK-4283
> URL: https://issues.apache.org/jira/browse/FLINK-4283
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
> Environment: Ubuntu 14.04
> W10
>Reporter: Chesnay Schepler
>Assignee: Alexander Shoshin
>  Labels: test-stability
>
> I encounter reliable failures for the following tests:
> testRestartAutomatically(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
>   Time elapsed: 120.089 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartAutomatically(ExecutionGraphRestartTest.java:155)
> taskShouldNotFailWhenFailureRateLimitWasNotExceeded(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
>   Time elapsed: 2.055 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.taskShouldNotFailWhenFailureRateLimitWasNotExceeded(ExecutionGraphRestartTest.java:180)
> testFailingExecutionAfterRestart(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
>   Time elapsed: 120.079 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testFailingExecutionAfterRestart(ExecutionGraphRestartTest.java:397)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82988652
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.windows;
+
+import org.apache.flink.api.table.SessionWindow;
+
+/**
+ * Helper class for creating a session window. Session windows are ideal 
for cases where the
+ * window boundaries need to adjust to the incoming data. In a session 
window it is possible to
+ * have windows that start at individual points in time for each key and 
that end once there has
+ * been a certain period of inactivity.
+ */
+public class Session {
--- End diff --

During the development I had the problem that I had to use 
`Session$.MODULE$.withGap(10)`. See [1]. But now it is not an companion object 
anymore, so it works. I will change that.

[1] 
http://stackoverflow.com/questions/3282653/how-do-you-call-a-scala-singleton-method-from-java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83005706
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -256,11 +265,11 @@ public int compare(String o1, String o2) {
}
 
@Test
-   public void testFileReadingOperator() throws Exception {
+   public void testFileReadingOperatorWithEventTime() throws Exception {
Set filesCreated = new HashSet<>();
Map expectedFileContents = new HashMap<>();
-   for(int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   for (int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
--- End diff --

This can probably be moved into a `@Before` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83006923
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -336,237 +348,294 @@ public int compare(String o1, String o2) {
Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
}
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
-   private static class PathFilter extends FilePathFilter {
-
-   @Override
-   public boolean filterPath(Path filePath) {
-   return filePath.getName().startsWith("**");
-   }
-   }
+   Monitoring Function Tests   
//
 
@Test
public void testFilePathFiltering() throws Exception {
-   Set uniqFilesFound = new HashSet<>();
Set filesCreated = new HashSet<>();
+   Set filesKept = new TreeSet<>();
 
// create the files to be discarded
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "**file", i, "This is test line.");
+   Tuple2 file = 
createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
filesCreated.add(file.f0);
}
 
// create the files to be kept
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
filesCreated.add(file.f0);
+   filesKept.add(file.f0.getName());
}
 
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilesFilter(new PathFilter());
+
ContinuousFileMonitoringFunction monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
+   final FileVerifyingSourceContext context =
+   new FileVerifyingSourceContext(new OneShotLatch(), 
monitoringFunction, 0, -1);
+
monitoringFunction.open(new Configuration());
-   monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
+   monitoringFunction.run(context);
 
-   Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
-   for(int i = 0; i < NO_OF_FILES; i++) {
-   org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
-   
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
-   }
+   Assert.assertArrayEquals(filesKept.toArray(), 
context.getSeenFiles().toArray());
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   // finally delete the files created for the test.
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
--- End diff --

This could be moved into a `@After` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83022754
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 ---
@@ -257,74 +190,158 @@ public void open(Configuration parameters) throws 
Exception {
long failurePosMax = (long) (0.7 * LINES_PER_FILE);
 
elementsToFailure = (new Random().nextLong() % 
(failurePosMax - failurePosMin)) + failurePosMin;
-
-   if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
-   finalCollectedContent = new HashMap<>();
-   for (Map.Entry result: 
collectedContent.entrySet()) {
-   
finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
-   }
-   throw new SuccessException();
-   }
-   }
-
-   @Override
-   public void close() {
-   try {
-   super.close();
-   } catch (Exception e) {
-   e.printStackTrace();
-   }
}
 
@Override
public void invoke(String value) throws Exception {
-   int fileIdx = 
Character.getNumericValue(value.charAt(0));
+   int fileIdx = getFileIdx(value);
 
-   Set content = collectedContent.get(fileIdx);
+   Set content = actualContent.get(fileIdx);
if (content == null) {
content = new HashSet<>();
-   collectedContent.put(fileIdx, content);
+   actualContent.put(fileIdx, content);
}
 
+   // detect duplicate lines.
if (!content.add(value + "\n")) {
fail("Duplicate line: " + value);
System.exit(0);
}
 
-
elementCounter++;
+
+   // this is termination
if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
-   finalCollectedContent = new HashMap<>();
-   for (Map.Entry result: 
collectedContent.entrySet()) {
-   
finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
-   }
+   actualCollectedContent = actualContent;
throw new SuccessException();
}
 
-   count++;
-   if (!hasFailed) {
+   // add some latency so that we have at least one 
checkpoint in
+   if (!hasFailed && successfulCheckpoints == 0) {
Thread.sleep(2);
--- End diff --

The waiting time seems arbitrary. Is it required for the next step (see 
below)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4604) Add support for standard deviation/variance

2016-10-12 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569021#comment-15569021
 ] 

Timo Walther commented on FLINK-4604:
-

You cannot use {{!AggregateReduceFunctionsRule.INSTANCE.matches(call)}} as it 
is always false for sums. See 
{{AggregateReduceFunctionsRule.containsAvgStddevVarCall}}.

What do you mean with "something went wrong"? What about this:

{code}
val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
  case SqlKind.SUM => true
  case SqlKind.MIN => true
  case SqlKind.MAX => true
  case _ => false
}

!distinctAggs && !groupSets && !agg.indicator && supported
{code}

> Add support for standard deviation/variance
> ---
>
> Key: FLINK-4604
> URL: https://issues.apache.org/jira/browse/FLINK-4604
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Anton Mushin
> Attachments: 1.jpg
>
>
> Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, 
> STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test 
> and document this rule. 
> If we also want to add this aggregates to Table API is up for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2608: [FLINK-4512] [FLIP-10] Add option to persist periodic che...

2016-10-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2608
  
+1 to the general approach and the code

Some suggestions for name polishing:
  - How about renaming `DISCARD_ON_CANCELLATION` to 
`DELETE_ON_CANCELLATION`? That would sound more explicit like "cleanup" and 
actual file deletion.
  - Since all checkpoints are persistent (at least in HA), how about 
calling this `enableExternalizedCheckpoints()` rather than 
`enablePersistentCheckpoints()`?
  - I would suggest to drop the method `enablePersistentCheckpoints()` 
without a cleanup policy parameter. Whoever enables that feature should 
explicitly think about what cleanup policy they want.

For the future, can we get rid of the extra storage location for the 
externalized checkpoint metadata? Simply store it as well in the checkpoint 
directory? That makes it simpler for users to track and clean up checkpoints 
manually, if they want to retain externalized checkpoints across cancellations 
and terminal failures.
  - Both the config value and the location parameter to the 
`enablePersistentCheckpoints()` would be dropped.
  - That would imply that every state backend needs to be able to provide a 
storage location for the checkpoint metadata
  - The memory state backend would hence not work with externalized 
checkpoints, unless one sets explicitly a parameter 
`setExternalizedCheckpointsLocation(uri)`.`

Since this is a bigger change, I would suggest a followup pull request for 
that. The only change I would make to this pull request (to make transition to 
the followup smoother) is to remove the path parameter from the 
`enablePersistentCheckpoints()` methods and always use the configuration value 
(which will be replaced by the state backend's storage location).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82990268
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/properties.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.api.table.FlinkRelBuilder.NamedProperty
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationSuccess}
+
+abstract class Property(child: Expression) extends UnaryExpression {
+
+  override def toString = s"Property($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode =
+throw new UnsupportedOperationException("Property cannot be 
transformed to RexNode.")
+
+  override private[flink] def validateInput() =
+if (child.isInstanceOf[WindowReference]) {
--- End diff --

I just tried to keep the names short. Because the Scala line lengths are 
pretty strict.
Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4806) ResourceManager stop listening JobManager's leader address

2016-10-12 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-4806.
-
Resolution: Won't Fix

> ResourceManager stop listening JobManager's leader address
> --
>
> Key: FLINK-4806
> URL: https://issues.apache.org/jira/browse/FLINK-4806
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>
> Currently in flip-6 branch, when RM receives a registration from JM, it will 
> verify the leader session id of JM and attach a JobManagerLeaderListener with 
> it for monitoring the future changes. 
> Maybe we can simplify it a little bit. We don't monitor the leadership change 
> of the JM, after the verification passed when JM registered itself, we simply 
> write down the leader id of the registered the JM for future rpc filtering, 
> and start heartbeat monitor with JM. 
> If JM's leadership has been changed, the new JM will register itself, and RM 
> will verify its leadership when received registration, and RM can decide 
> whether accept or reject the registration. It's kind of like JM's information 
> in RM is preempted only by new JM but not by RM itself with leadership change 
> listener. By doing this, we can simplify the logic inside RM and don't have 
> to do any error handling with leader listener. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4787) Add REST API call for cancel-with-savepoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568599#comment-15568599
 ] 

ASF GitHub Bot commented on FLINK-4787:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2626

[FLINK-4787] [runtime-web] Expose cancel-with-savepoint via REST API

Follow up to #2609, exposing the cancel-with-savepoint command via the REST 
API. The relevant commits are the last two ones.

The `RequestHandler` now returns a generic `HttpResponse` instead of a 
`String`. This enables handlers to return custom reponses (different http 
codes, etc.). Now most handlers extend thee `AbstractJsonRequestHandler` for 
default JSON responses (which used to be handled by the generic 
`RequestHandler`).

Adds handlers for triggering and monitoring a job cancellation with 
savepoints. Since this operation can take some time, we do this asynchronously. 
According to various online resources, the way to go for REST APIs in such 
cases is to return HTTP 201 accepted with the location of the in-progress 
operation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 4787-cancel_with_savepoint_rest_api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2626.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2626


commit 99a9621383e6a223e39e4ec22d60671a205d958d
Author: Ufuk Celebi 
Date:   2016-10-06T14:43:42Z

[FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints

[FLINK-4509] [FLIP-10] Specify savepoint directory per savepoint
[FLINK-4507] [FLIP-10] Deprecate savepoint backend config

commit 25ffc04d7e5d7ef9538447ee5162c0d203e96e89
Author: Ufuk Celebi 
Date:   2016-10-07T09:48:47Z

[FLINK-4717] Add CancelJobWithSavepoint

- Adds CancelJobWithSavepoint message, which triggers a savepoint
  before cancelling the respective job.
- Adds -s [targetDirectory] option to CLI cancel command:
* bin/flink cancel  (regular cancelling)
* bin/flink cancel -s  (cancel with savepoint to default dir)
* bin/flink cancek -s   (cancel with savepoint to 
targetDir)

commit bc88dba90a263e80691448e20644e5f126551bb6
Author: Ufuk Celebi 
Date:   2016-10-11T08:08:14Z

[FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler

- Let RequestHandler return a generic HttpResponse instead of a String. This
  enables handlers to return custom reponses (differnt http codes, etc.)
- Introduce AbstractJsonRequestHandler for default JSON responses

commit ecbcf46f5a9d874dbdd908d48c7035c1cb338c1a
Author: Ufuk Celebi 
Date:   2016-10-11T08:09:20Z

[FLINK-4787] [runtime-web] Add JobCancellationWithSavepointHandlers

- Add handlers for triggering and monitoring job cancellation with
  savepoints.




> Add REST API call for cancel-with-savepoints
> 
>
> Key: FLINK-4787
> URL: https://issues.apache.org/jira/browse/FLINK-4787
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Ufuk Celebi
>
> As a follow up to FLINK-4717, expose the cancel-with-savepoint command via 
> the REST API.
> {code}
> /jobs/:jobid/cancel-with-savepoint/
> /jobs/:jobid/cancel-with-savepoint/:targetDirectory
> {code}
> The first command goes to the default savepoint directory, the second one 
> uses the given target directory.
> The calls need to be async, as triggering a savepoint can take some time. For 
> this, the handlers return a {{201 (Accepted)}} response with the location of 
> the status, e.g. {{/jobs/:jobid/cancel-with-savepoint/in-progress/:id}}.
> The user has to check that location until the final savepoint path is 
> returned.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4283) ExecutionGraphRestartTest fails

2016-10-12 Thread Alexander Shoshin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568940#comment-15568940
 ] 

Alexander Shoshin commented on FLINK-4283:
--

It is still relevant. I have just strated researching it.
These tests finishs successfully when run separately but fails when run 
together with other tests from the same class.
I will write when I find a cause.

> ExecutionGraphRestartTest fails
> ---
>
> Key: FLINK-4283
> URL: https://issues.apache.org/jira/browse/FLINK-4283
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
> Environment: Ubuntu 14.04
> W10
>Reporter: Chesnay Schepler
>Assignee: Alexander Shoshin
>  Labels: test-stability
>
> I encounter reliable failures for the following tests:
> testRestartAutomatically(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
>   Time elapsed: 120.089 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartAutomatically(ExecutionGraphRestartTest.java:155)
> taskShouldNotFailWhenFailureRateLimitWasNotExceeded(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
>   Time elapsed: 2.055 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.taskShouldNotFailWhenFailureRateLimitWasNotExceeded(ExecutionGraphRestartTest.java:180)
> testFailingExecutionAfterRestart(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
>   Time elapsed: 120.079 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testFailingExecutionAfterRestart(ExecutionGraphRestartTest.java:397)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()

2016-10-12 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569025#comment-15569025
 ] 

Shannon Carey commented on FLINK-4803:
--

Yes, that's right. cancel() blocks on close(), and therefore if close() 
misbehaves the thread is never interrupted and cancel() blocks forever.

In the issue description, I suggested your option #2. I think you'll want #1 no 
matter what. However, #2 allows for at least one message and/or exception to be 
logged that tells the user what went wrong (why their job is taking a long time 
to cancel, or why it did not cancel gracefully). I'm not sure what your 
DataSink-specific option would look like. Maybe it is similar to my workaround, 
where I wrapped my HadoopOutputFormat in a subclass that calls super.close() 
from a separate thread with a timeout? That workaround is ok, but I had to 
expend a fair amount of effort to figure out what the problem was, and also 
there was nothing I could do but restart Flink in order to get my job to 
terminate (not a desirable solution). You'll want Flink to function smoothly 
regardless of what data sink the user chooses.

> Job Cancel can hang forever waiting for OutputFormat.close()
> 
>
> Key: FLINK-4803
> URL: https://issues.apache.org/jira/browse/FLINK-4803
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.1
>Reporter: Shannon Carey
>
> If the Flink job uses a badly-behaved OutputFormat (in this example, a 
> HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() 
> method blocks forever, it is impossible to cancel the Flink job even though 
> the blocked thread would respond to an interrupt. The stack traces below show 
> the state of the important threads when a job is canceled and the 
> OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
> `this.format.close()`. When the timeout is reached, the Task thread should be 
> interrupted.
> {code}
> "Canceler for DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6422 daemon prio=5 os_prio=0 tid=0x7fb7e42f nid=0x34f3 waiting for 
> monitor entry [0x7fb7be079000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - waiting to lock <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
> at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
> at java.lang.Thread.run(Thread.java:745)
> "DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on 
> condition [0x7fb7bdf78000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006c5ab5e20> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
> at 
> java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
> at 
> org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - locked <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4512) Add option for persistent checkpoints

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569028#comment-15569028
 ] 

ASF GitHub Bot commented on FLINK-4512:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2608
  
+1 to the general approach and the code

Some suggestions for name polishing:
  - How about renaming `DISCARD_ON_CANCELLATION` to 
`DELETE_ON_CANCELLATION`? That would sound more explicit like "cleanup" and 
actual file deletion.
  - Since all checkpoints are persistent (at least in HA), how about 
calling this `enableExternalizedCheckpoints()` rather than 
`enablePersistentCheckpoints()`?
  - I would suggest to drop the method `enablePersistentCheckpoints()` 
without a cleanup policy parameter. Whoever enables that feature should 
explicitly think about what cleanup policy they want.

For the future, can we get rid of the extra storage location for the 
externalized checkpoint metadata? Simply store it as well in the checkpoint 
directory? That makes it simpler for users to track and clean up checkpoints 
manually, if they want to retain externalized checkpoints across cancellations 
and terminal failures.
  - Both the config value and the location parameter to the 
`enablePersistentCheckpoints()` would be dropped.
  - That would imply that every state backend needs to be able to provide a 
storage location for the checkpoint metadata
  - The memory state backend would hence not work with externalized 
checkpoints, unless one sets explicitly a parameter 
`setExternalizedCheckpointsLocation(uri)`.`

Since this is a bigger change, I would suggest a followup pull request for 
that. The only change I would make to this pull request (to make transition to 
the followup smoother) is to remove the path parameter from the 
`enablePersistentCheckpoints()` methods and always use the configuration value 
(which will be replaced by the state backend's storage location).


> Add option for persistent checkpoints
> -
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data. 
> This is what we currently do for savepoints, but in the future checkpoints 
> and savepoints are likely to diverge with respect to guarantees they give for 
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints 
> in the long term will be that persistent checkpoints can only be restored 
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to 
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): 
> regular checkpoints are cleaned up in all of these cases whereas persistent 
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on 
> CANCELLED or FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2627: Kafka 0.10 follow-up fixes

2016-10-12 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2627

Kafka 0.10 follow-up fixes

After merging https://github.com/apache/flink/pull/2369, there was some 
follow-up feedback in the final commit: 
https://github.com/apache/flink/commit/6731ec1e48d0a0092dd2330adda73bcf37fda8d7#commitcomment-19375265,
 which I'm addressing with this pull request:

- I'm adding a `Kafka010FetcherTest`, which is based on the 
`Kafka09FetcherTest`.
- I'm undoing the changes in `DataGenerators` to avoid starting a full 
Flink job to produce some data into a topic.
- I'm removing some commented-out code that has been part of the Kafka code 
for a while.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink kafka_inline_producer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2627.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2627


commit 744f8ebb66b2a7288942be139cd7a7e6d1170c80
Author: Robert Metzger 
Date:   2016-10-11T13:48:32Z

[hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again

commit ce26248ad0a4672eeb556863a061a478987694e9
Author: Robert Metzger 
Date:   2016-10-12T12:03:01Z

[hotfix][kafka] Backport Kafka09FetcherTest for Kafka010




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4635) Implement Data Transfer Authentication using shared secret configuration

2016-10-12 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4635:
--
Issue Type: Sub-task  (was: Task)
Parent: FLINK-3930

> Implement Data Transfer Authentication using shared secret configuration
> 
>
> Key: FLINK-4635
> URL: https://issues.apache.org/jira/browse/FLINK-4635
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Vijay Srinivasaraghavan
>Assignee: Vijay Srinivasaraghavan
>
> The data transfer authentication (TM/Netty) requirement was not addressed as 
> part of FLINK-3930 and this JIRA is created to track the issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83017556
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -336,237 +348,294 @@ public int compare(String o1, String o2) {
Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
}
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
-   private static class PathFilter extends FilePathFilter {
-
-   @Override
-   public boolean filterPath(Path filePath) {
-   return filePath.getName().startsWith("**");
-   }
-   }
+   Monitoring Function Tests   
//
 
@Test
public void testFilePathFiltering() throws Exception {
-   Set uniqFilesFound = new HashSet<>();
Set filesCreated = new HashSet<>();
+   Set filesKept = new TreeSet<>();
 
// create the files to be discarded
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "**file", i, "This is test line.");
+   Tuple2 file = 
createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
filesCreated.add(file.f0);
}
 
// create the files to be kept
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
filesCreated.add(file.f0);
+   filesKept.add(file.f0.getName());
}
 
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilesFilter(new PathFilter());
+
ContinuousFileMonitoringFunction monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
+   final FileVerifyingSourceContext context =
+   new FileVerifyingSourceContext(new OneShotLatch(), 
monitoringFunction, 0, -1);
+
monitoringFunction.open(new Configuration());
-   monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
+   monitoringFunction.run(context);
 
-   Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
-   for(int i = 0; i < NO_OF_FILES; i++) {
-   org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
-   
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
-   }
+   Assert.assertArrayEquals(filesKept.toArray(), 
context.getSeenFiles().toArray());
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   // finally delete the files created for the test.
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
+   private static class PathFilter extends FilePathFilter {
+   @Override
+   public boolean filterPath(Path filePath) {
+   return filePath.getName().startsWith("**");
+   }
+   }
+
@Test
-   public void testFileSplitMonitoringReprocessWithAppended() throws 
Exception {
-   final Set uniqFilesFound = new HashSet<>();
+   public void testSortingOnModTime() throws Exception {
+   final long[] modTimes = new long[NO_OF_FILES];
+   final org.apache.hadoop.fs.Path[] filesCreated = new 
org.apache.hadoop.fs.Path[NO_OF_FILES];
+
+   // create some files
+   for (int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
+   Thread.sleep(10);
+
+   filesCreated[i] = file.f0;
+   modTimes[i] = 
hdfs.getFileStatus(file.f0).getModificationTime();
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83014810
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -336,237 +348,294 @@ public int compare(String o1, String o2) {
Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
}
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
-   private static class PathFilter extends FilePathFilter {
-
-   @Override
-   public boolean filterPath(Path filePath) {
-   return filePath.getName().startsWith("**");
-   }
-   }
+   Monitoring Function Tests   
//
 
@Test
public void testFilePathFiltering() throws Exception {
-   Set uniqFilesFound = new HashSet<>();
Set filesCreated = new HashSet<>();
+   Set filesKept = new TreeSet<>();
 
// create the files to be discarded
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "**file", i, "This is test line.");
+   Tuple2 file = 
createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
filesCreated.add(file.f0);
}
 
// create the files to be kept
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
filesCreated.add(file.f0);
+   filesKept.add(file.f0.getName());
}
 
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilesFilter(new PathFilter());
+
ContinuousFileMonitoringFunction monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
+   final FileVerifyingSourceContext context =
+   new FileVerifyingSourceContext(new OneShotLatch(), 
monitoringFunction, 0, -1);
+
monitoringFunction.open(new Configuration());
-   monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
+   monitoringFunction.run(context);
 
-   Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
-   for(int i = 0; i < NO_OF_FILES; i++) {
-   org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
-   
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
-   }
+   Assert.assertArrayEquals(filesKept.toArray(), 
context.getSeenFiles().toArray());
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   // finally delete the files created for the test.
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
+   private static class PathFilter extends FilePathFilter {
+   @Override
+   public boolean filterPath(Path filePath) {
+   return filePath.getName().startsWith("**");
+   }
+   }
+
@Test
-   public void testFileSplitMonitoringReprocessWithAppended() throws 
Exception {
-   final Set uniqFilesFound = new HashSet<>();
+   public void testSortingOnModTime() throws Exception {
+   final long[] modTimes = new long[NO_OF_FILES];
+   final org.apache.hadoop.fs.Path[] filesCreated = new 
org.apache.hadoop.fs.Path[NO_OF_FILES];
+
+   // create some files
+   for (int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
+   Thread.sleep(10);
+
+   filesCreated[i] = file.f0;
+   modTimes[i] = 
hdfs.getFileStatus(file.f0).getModificationTime();
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83007178
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -336,237 +348,294 @@ public int compare(String o1, String o2) {
Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
}
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
-   private static class PathFilter extends FilePathFilter {
-
-   @Override
-   public boolean filterPath(Path filePath) {
-   return filePath.getName().startsWith("**");
-   }
-   }
+   Monitoring Function Tests   
//
 
@Test
public void testFilePathFiltering() throws Exception {
-   Set uniqFilesFound = new HashSet<>();
Set filesCreated = new HashSet<>();
+   Set filesKept = new TreeSet<>();
 
// create the files to be discarded
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "**file", i, "This is test line.");
+   Tuple2 file = 
createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
filesCreated.add(file.f0);
}
 
// create the files to be kept
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
filesCreated.add(file.f0);
+   filesKept.add(file.f0.getName());
}
 
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilesFilter(new PathFilter());
+
ContinuousFileMonitoringFunction monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
+   final FileVerifyingSourceContext context =
+   new FileVerifyingSourceContext(new OneShotLatch(), 
monitoringFunction, 0, -1);
+
monitoringFunction.open(new Configuration());
-   monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
+   monitoringFunction.run(context);
 
-   Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
-   for(int i = 0; i < NO_OF_FILES; i++) {
-   org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
-   
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
-   }
+   Assert.assertArrayEquals(filesKept.toArray(), 
context.getSeenFiles().toArray());
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   // finally delete the files created for the test.
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
+   private static class PathFilter extends FilePathFilter {
--- End diff --

You moved this class but really you're only using it in one test case and 
can simply make it an anonymous function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83019658
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -107,10 +113,10 @@ public ContinuousFileMonitoringFunction(
@Override
@SuppressWarnings("unchecked")
public void open(Configuration parameters) throws Exception {
-   LOG.info("Opening File Monitoring Source.");
-
super.open(parameters);
format.configure(parameters);
+
+   LOG.info("Opening File Monitoring Source for path: " + path + 
".");
--- End diff --

I think this doesn't have to be logged at INFO level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83016929
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -336,237 +348,294 @@ public int compare(String o1, String o2) {
Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
}
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
-   private static class PathFilter extends FilePathFilter {
-
-   @Override
-   public boolean filterPath(Path filePath) {
-   return filePath.getName().startsWith("**");
-   }
-   }
+   Monitoring Function Tests   
//
 
@Test
public void testFilePathFiltering() throws Exception {
-   Set uniqFilesFound = new HashSet<>();
Set filesCreated = new HashSet<>();
+   Set filesKept = new TreeSet<>();
 
// create the files to be discarded
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "**file", i, "This is test line.");
+   Tuple2 file = 
createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
filesCreated.add(file.f0);
}
 
// create the files to be kept
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
filesCreated.add(file.f0);
+   filesKept.add(file.f0.getName());
}
 
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilesFilter(new PathFilter());
+
ContinuousFileMonitoringFunction monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
+   final FileVerifyingSourceContext context =
+   new FileVerifyingSourceContext(new OneShotLatch(), 
monitoringFunction, 0, -1);
+
monitoringFunction.open(new Configuration());
-   monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
+   monitoringFunction.run(context);
 
-   Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
-   for(int i = 0; i < NO_OF_FILES; i++) {
-   org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
-   
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
-   }
+   Assert.assertArrayEquals(filesKept.toArray(), 
context.getSeenFiles().toArray());
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   // finally delete the files created for the test.
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
+   private static class PathFilter extends FilePathFilter {
+   @Override
+   public boolean filterPath(Path filePath) {
+   return filePath.getName().startsWith("**");
+   }
+   }
+
@Test
-   public void testFileSplitMonitoringReprocessWithAppended() throws 
Exception {
-   final Set uniqFilesFound = new HashSet<>();
+   public void testSortingOnModTime() throws Exception {
+   final long[] modTimes = new long[NO_OF_FILES];
+   final org.apache.hadoop.fs.Path[] filesCreated = new 
org.apache.hadoop.fs.Path[NO_OF_FILES];
+
+   // create some files
+   for (int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
+   Thread.sleep(10);
+
+   filesCreated[i] = file.f0;
+   modTimes[i] = 
hdfs.getFileStatus(file.f0).getModificationTime();
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83005668
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -111,8 +114,8 @@ public void testFileReadingOperatorWithIngestionTime() 
throws Exception {
Set filesCreated = new HashSet<>();
Map expectedFileContents = new HashMap<>();
 
-   for(int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   for (int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
--- End diff --

This can probably be moved into a `@Before` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83019574
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -295,7 +256,7 @@ public void close() throws Exception {
globalModificationTime = Long.MAX_VALUE;
isRunning = false;
}
-   LOG.info("Closed File Monitoring Source.");
+   LOG.info("Closing File Monitoring Source for path: " + path + 
".");
--- End diff --

Should we really lock this at info level?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83016682
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -336,237 +348,294 @@ public int compare(String o1, String o2) {
Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
}
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
-   private static class PathFilter extends FilePathFilter {
-
-   @Override
-   public boolean filterPath(Path filePath) {
-   return filePath.getName().startsWith("**");
-   }
-   }
+   Monitoring Function Tests   
//
 
@Test
public void testFilePathFiltering() throws Exception {
-   Set uniqFilesFound = new HashSet<>();
Set filesCreated = new HashSet<>();
+   Set filesKept = new TreeSet<>();
 
// create the files to be discarded
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "**file", i, "This is test line.");
+   Tuple2 file = 
createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
filesCreated.add(file.f0);
}
 
// create the files to be kept
for (int i = 0; i < NO_OF_FILES; i++) {
-   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
filesCreated.add(file.f0);
+   filesKept.add(file.f0.getName());
}
 
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilesFilter(new PathFilter());
+
ContinuousFileMonitoringFunction monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, hdfsURI,
FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
+   final FileVerifyingSourceContext context =
+   new FileVerifyingSourceContext(new OneShotLatch(), 
monitoringFunction, 0, -1);
+
monitoringFunction.open(new Configuration());
-   monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
+   monitoringFunction.run(context);
 
-   Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
-   for(int i = 0; i < NO_OF_FILES; i++) {
-   org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
-   
Assert.assertTrue(uniqFilesFound.contains(file.toString()));
-   }
+   Assert.assertArrayEquals(filesKept.toArray(), 
context.getSeenFiles().toArray());
 
-   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   // finally delete the files created for the test.
+   for (org.apache.hadoop.fs.Path file: filesCreated) {
hdfs.delete(file, false);
}
}
 
+   private static class PathFilter extends FilePathFilter {
+   @Override
+   public boolean filterPath(Path filePath) {
+   return filePath.getName().startsWith("**");
+   }
+   }
+
@Test
-   public void testFileSplitMonitoringReprocessWithAppended() throws 
Exception {
-   final Set uniqFilesFound = new HashSet<>();
+   public void testSortingOnModTime() throws Exception {
+   final long[] modTimes = new long[NO_OF_FILES];
+   final org.apache.hadoop.fs.Path[] filesCreated = new 
org.apache.hadoop.fs.Path[NO_OF_FILES];
+
+   // create some files
+   for (int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file =
+   createFileAndFillWithData(hdfsURI, "file", i, 
"This is test line.");
+   Thread.sleep(10);
+
+   filesCreated[i] = file.f0;
+   modTimes[i] = 
hdfs.getFileStatus(file.f0).getModificationTime();
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83020529
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -60,26 +63,27 @@
private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
 
/**
-* The minimum interval allowed between consecutive path scans. This is 
applicable if the
-* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}.
+* The minimum interval allowed between consecutive path scans.
+* NOTE: Only applicable to the {@code PROCESS_CONTINUOUSLY} 
mode.
 */
-   public static final long MIN_MONITORING_INTERVAL = 100l;
+   public static final long MIN_MONITORING_INTERVAL = 1l;
 
/** The path to monitor. */
private final String path;
 
-   /** The default parallelism for the job, as this is going to be the 
parallelism of the downstream readers. */
+   /** The parallelism of the downstream readers. */
private final int readerParallelism;
 
/** The {@link FileInputFormat} to be read. */
private FileInputFormat format;
 
-   /** How often to monitor the state of the directory for new data. */
+   /** The interval between consecutive path scans. */
private final long interval;
 
/** Which new data to process (see {@link FileProcessingMode}. */
private final FileProcessingMode watchType;
 
+   /** The maximum file modification time seen so far. */
private Long globalModificationTime;
--- End diff --

I wonder, should this be `volatile` because it is accessed by the 
checkpoint thread?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83019134
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -146,102 +152,57 @@ public void 
run(SourceFunction.SourceContext context) throws Exc
}
}
 
-   private void monitorDirAndForwardSplits(FileSystem fs, 
SourceContext context) throws IOException, JobException {
+   private void monitorDirAndForwardSplits(FileSystem fs, 
SourceContext context) throws IOException {
assert (Thread.holdsLock(checkpointLock));
 
-   List> splitsByModTime = 
getInputSplitSortedOnModTime(fs);
-
-   Iterator> it = 
splitsByModTime.iterator();
-   while (it.hasNext()) {
-   forwardSplits(it.next(), context);
-   it.remove();
-   }
-   }
-
-   private void forwardSplits(Tuple2 
splitsToFwd, SourceContext context) {
-   assert (Thread.holdsLock(checkpointLock));
-
-   Long modTime = splitsToFwd.f0;
-   List splits = splitsToFwd.f1;
-
-   Iterator it = splits.iterator();
-   while (it.hasNext()) {
-   FileInputSplit split = it.next();
-   processSplit(split, context);
-   it.remove();
-   }
+   List eligibleFiles = listEligibleFiles(fs);
+   Map splitsSortedByModTime = 
getInputSplitsSortedByModTime(eligibleFiles);
 
-   // update the global modification time
-   if (modTime >= globalModificationTime) {
-   globalModificationTime = modTime;
-   }
-   }
-
-   private void processSplit(FileInputSplit split, 
SourceContext context) {
-   LOG.info("Forwarding split: " + split);
-   context.collect(split);
-   }
-
-   private List> 
getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
-   List eligibleFiles = listEligibleFiles(fileSystem);
-   if (eligibleFiles.isEmpty()) {
-   return new ArrayList<>();
-   }
-
-   Map splitsToForward = 
getInputSplits(eligibleFiles);
-   List> sortedSplitsToForward 
= new ArrayList<>();
-
-   for (Map.Entry entry : 
splitsToForward.entrySet()) {
-   sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), 
entry.getValue()));
-   }
-
-   Collections.sort(sortedSplitsToForward, new 
Comparator>() {
-   @Override
-   public int compare(Tuple2 
o1, Tuple2 o2) {
-   return (int) (o1.f0 - o2.f0);
+   for (Map.Entry splits: 
splitsSortedByModTime.entrySet()) {
+   long modificationTime = splits.getKey();
+   for (FileInputSplit split: splits.getValue()) {
+   LOG.info("Forwarding split: " + split);
+   context.collect(split);
}
-   });
-
-   return sortedSplitsToForward;
+   // update the global modification time
+   globalModificationTime = 
Math.max(globalModificationTime, modificationTime);
+   }
}
 
/**
-* Creates the input splits for the path to be forwarded to the 
downstream tasks of the
-* {@link ContinuousFileReaderOperator}. Those tasks are going to read 
their contents for further
-* processing. Splits belonging to files in the {@code eligibleFiles} 
list are the ones
-* that are shipped for further processing.
+* Creates the input splits to be forwarded to the downstream tasks of 
the
+* {@link ContinuousFileReaderOperator}. Splits are sorted by 
modification time before
+* being forwarded and only splits belonging to files in the {@code 
eligibleFiles}
+* list will be processed.
 * @param eligibleFiles The files to process.
 */
-   private Map getInputSplits(List 
eligibleFiles) throws IOException {
+   private Map 
getInputSplitsSortedByModTime(List eligibleFiles) throws 
IOException {
+   Map splitsByModTime = new 
TreeMap<>();
if (eligibleFiles.isEmpty()) {
-   return new HashMap<>();
+   return splitsByModTime;
}
 
-   FileInputSplit[] inputSplits = 
format.createInputSplits(readerParallelism);

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83004834
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java
 ---
@@ -111,174 +115,191 @@ protected void testProgram() throws Exception {
* reader.
* */
 
-   FileCreator fileCreator = new FileCreator(INTERVAL);
-   Thread t = new Thread(fileCreator);
-   t.start();
-
TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
format.setFilePath(hdfsURI);
-
-   try {
-   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-   env.setParallelism(4);
-
-   
format.setFilesFilter(FilePathFilter.createDefaultFilter());
-   ContinuousFileMonitoringFunction 
monitoringFunction =
-   new ContinuousFileMonitoringFunction<>(format, 
hdfsURI,
-   FileProcessingMode.PROCESS_CONTINUOUSLY,
-   env.getParallelism(), INTERVAL);
-
-   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
-   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
-   TestingSinkFunction sink = new TestingSinkFunction();
-
-   DataStream splits = 
env.addSource(monitoringFunction);
-   splits.transform("FileSplitReader", typeInfo, 
reader).addSink(sink).setParallelism(1);
-   env.execute();
-
-   } catch (Exception e) {
-   Throwable th = e;
-   int depth = 0;
-
-   for (; depth < 20; depth++) {
-   if (th instanceof SuccessException) {
-   try {
-   postSubmit();
-   } catch (Exception e1) {
-   e1.printStackTrace();
+   format.setFilesFilter(FilePathFilter.createDefaultFilter());
+
+   // create the stream execution environment with a parallelism > 
1 to test
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(PARALLELISM);
+
+   ContinuousFileMonitoringFunction monitoringFunction =
+   new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+   FileProcessingMode.PROCESS_CONTINUOUSLY,
+   env.getParallelism(), INTERVAL);
+
+   // the monitor has always DOP 1
+   DataStream splits = 
env.addSource(monitoringFunction);
+   Assert.assertEquals(1, splits.getParallelism());
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   // the readers can be multiple
+   DataStream content = 
splits.transform("FileSplitReader", typeInfo, reader);
+   Assert.assertEquals(PARALLELISM, content.getParallelism());
+
+   // finally for the sink we set the parallelism to 1 so that we 
can verify the output
+   TestingSinkFunction sink = new TestingSinkFunction();
+   content.addSink(sink).setParallelism(1);
+
+   Thread job = new Thread() {
+
+   @Override
+   public void run() {
+   try {
+   
env.execute("ContinuousFileProcessingITCase Job.");
+   } catch (Exception e) {
+   Throwable th = e;
+   int depth = 0;
+
+   for (; depth < 20; depth++) {
--- End diff --

Why not the following:

```java
for (int depth = 0; depth < 20; depth++) {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83014794
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -578,18 +647,28 @@ public void emitWatermark(Watermark mark) {
 
@Override
public Object getCheckpointLock() {
-   return lock;
+   return new Object();
--- End diff --

Why always a new object?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83015686
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -578,18 +647,28 @@ public void emitWatermark(Watermark mark) {
 
@Override
public Object getCheckpointLock() {
-   return lock;
+   return new Object();
}
 
@Override
public void close() {
}
}
 
+   /   Auxiliary Methods   
/
+
+   private int getLineNo(String line) {
+   String[] tkns = line.split("\\s");
+   Assert.assertEquals(6, tkns.length);
+   return Integer.parseInt(tkns[tkns.length - 1]);
+   }
+
/**
-* Fill the file with content.
+* Create a file with pre-determined content.
--- End diff --

What is the format of the content?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83022623
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 ---
@@ -257,74 +190,158 @@ public void open(Configuration parameters) throws 
Exception {
long failurePosMax = (long) (0.7 * LINES_PER_FILE);
 
elementsToFailure = (new Random().nextLong() % 
(failurePosMax - failurePosMin)) + failurePosMin;
-
-   if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
-   finalCollectedContent = new HashMap<>();
-   for (Map.Entry result: 
collectedContent.entrySet()) {
-   
finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
-   }
-   throw new SuccessException();
-   }
-   }
-
-   @Override
-   public void close() {
-   try {
-   super.close();
-   } catch (Exception e) {
-   e.printStackTrace();
-   }
}
 
@Override
public void invoke(String value) throws Exception {
-   int fileIdx = 
Character.getNumericValue(value.charAt(0));
+   int fileIdx = getFileIdx(value);
 
-   Set content = collectedContent.get(fileIdx);
+   Set content = actualContent.get(fileIdx);
if (content == null) {
content = new HashSet<>();
-   collectedContent.put(fileIdx, content);
+   actualContent.put(fileIdx, content);
}
 
+   // detect duplicate lines.
if (!content.add(value + "\n")) {
fail("Duplicate line: " + value);
System.exit(0);
}
 
-
elementCounter++;
+
+   // this is termination
if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
-   finalCollectedContent = new HashMap<>();
-   for (Map.Entry result: 
collectedContent.entrySet()) {
-   
finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
-   }
+   actualCollectedContent = actualContent;
throw new SuccessException();
}
 
-   count++;
-   if (!hasFailed) {
+   // add some latency so that we have at least one 
checkpoint in
+   if (!hasFailed && successfulCheckpoints == 0) {
Thread.sleep(2);
-   if (numSuccessfulCheckpoints >= 1 && count >= 
elementsToFailure) {
-   hasFailed = true;
-   throw new Exception("Task Failure");
-   }
+   }
+
+   // simulate a node failure
+   if (!hasFailed && successfulCheckpoints > 0 && 
elementCounter >= elementsToFailure) {
--- End diff --

Is it assured that a checkpoint has been taken place? Or is the exception 
simply skipped here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-12 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r83005541
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTests.java
 ---
@@ -54,8 +55,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
-public class ContinuousFileMonitoringTest {
+public class ContinuousFileProcessingTests {
--- End diff --

By convention, unit tests are named `*Test` and integration tests 
`*ITCase`. Actually, it affects when they're executed as integration tests will 
be executed in Maven's `verify` phase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82990475
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 ---
@@ -27,48 +27,65 @@ import scala.collection.mutable.ListBuffer
 object RexNodeTranslator {
--- End diff --

Good point. I renamed it to `ProjectionTranslator`.
Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568534#comment-15568534
 ] 

ASF GitHub Bot commented on FLINK-4691:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82990475
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 ---
@@ -27,48 +27,65 @@ import scala.collection.mutable.ListBuffer
 object RexNodeTranslator {
--- End diff --

Good point. I renamed it to `ProjectionTranslator`.
Done.


> Add group-windows for streaming tables
> ---
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82999720
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{LongComparator, 
LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, 
TypeSerializer}
+import 
org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo[T](
--- End diff --

OK, I understand the issue with the instance checks (although I would think 
these checks are not correct if they fail in such a cases). 

However, providing a full implementation for each internal type does not 
seem right. How about we create a special abstract TypeInfo for types that are 
not required at execution time and implement all irrelevant methods (arity, 
serializer, comparator, etc.) with `UnsupportedOperationException`. 
`RowIntervalTypeInfo` and `TimeIntervalTypeInfo` would extend that and just 
provide the type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568653#comment-15568653
 ] 

ASF GitHub Bot commented on FLINK-4691:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82999720
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{LongComparator, 
LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, 
TypeSerializer}
+import 
org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo[T](
--- End diff --

OK, I understand the issue with the instance checks (although I would think 
these checks are not correct if they fail in such a cases). 

However, providing a full implementation for each internal type does not 
seem right. How about we create a special abstract TypeInfo for types that are 
not required at execution time and implement all irrelevant methods (arity, 
serializer, comparator, etc.) with `UnsupportedOperationException`. 
`RowIntervalTypeInfo` and `TimeIntervalTypeInfo` would extend that and just 
provide the type.


> Add group-windows for streaming tables
> ---
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2619: [FLINK-4108] [scala] Consider ResultTypeQueryable for inp...

2016-10-12 Thread albertoRamon
Github user albertoRamon commented on the issue:

https://github.com/apache/flink/pull/2619
  
Works OK  ¡¡
[Capture 
Output](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/jdbc-JDBCInputFormat-tp9393p9489.html)

Thanks, Alb
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4108) NPE in Row.productArity

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568712#comment-15568712
 ] 

ASF GitHub Bot commented on FLINK-4108:
---

Github user albertoRamon commented on the issue:

https://github.com/apache/flink/pull/2619
  
Works OK  ¡¡
[Capture 
Output](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/jdbc-JDBCInputFormat-tp9393p9489.html)

Thanks, Alb
 


> NPE in Row.productArity
> ---
>
> Key: FLINK-4108
> URL: https://issues.apache.org/jira/browse/FLINK-4108
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Type 
> Serialization System
>Affects Versions: 1.1.0
>Reporter: Martin Scholl
>Assignee: Timo Walther
>
> [this is my first issue request here, please apologize if something is 
> missing]
>  JDBCInputFormat of flink 1.1-SNAPSHOT fails with an NPE in Row.productArity:
> {quote}
> java.io.IOException: Couldn't access resultSet
> at 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:288)
> at 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:98)
> at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at org.apache.flink.api.table.Row.productArity(Row.scala:28)
> at 
> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:279)
> ... 4 more
> {quote}
> The code reproduce this can be found in this gist: 
> https://gist.github.com/zeitgeist/b91a60460661618ca4585e082895c616
> The reason for the NPE, I believe, is the way through which Flink creates Row 
> instances through Kryo. As Row expects the number of fields to allocate as a 
> parameter, which Kryo does not provide, the ‘fields’ member of Row ends up 
> being null. As I’m neither a reflection nor a Kryo expert, I rather leave a 
> true analysis to more knowledgable programmers.
> Part of the aforementioned example is a not very elegant workaround though a 
> custom type and a cast (function {{jdbcNoIssue}} + custom Row type {{MyRow}}) 
> to serve as a further hint towards my theory.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2439: [FLINK-4450]update storm verion to 1.0.0 in flink-storm a...

2016-10-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2439
  
Okay, then we should re-add all the dependency exclusions that were removed 
from the `flink-storm/pom.xml`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4813) Having flink-test-utils as a dependency outside Flink fails the build

2016-10-12 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568657#comment-15568657
 ] 

Maximilian Michels commented on FLINK-4813:
---

+1 That sounds like a good solution.

> Having flink-test-utils as a dependency outside Flink fails the build
> -
>
> Key: FLINK-4813
> URL: https://issues.apache.org/jira/browse/FLINK-4813
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>
> The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a 
> dependency, which is only resolvable, if the {{maven-bundle-plugin}} is 
> loaded.
> This is the error message
> {code}
> [ERROR] Failed to execute goal on project quickstart-1.2-tests: Could not 
> resolve dependencies for project 
> com.dataartisans:quickstart-1.2-tests:jar:1.0-SNAPSHOT: Failure to find 
> org.apache.directory.jdbm:apacheds-jdbm1:bundle:2.0.0-M2 in 
> https://repo.maven.apache.org/maven2 was cached in the local repository, 
> resolution will not be reattempted until the update interval of central has 
> elapsed or updates are forced -> [Help 1]
> {code}
> {{flink-parent}} loads that plugin, so all "internal" dependencies to the 
> test utils can resolve the plugin.
> Right now, users have to use the maven bundle plugin to use our test utils 
> externally.
> By making the hadoop minikdc dependency optional, we can probably resolve the 
> issues. Then, only users who want to use the security-related tools in the 
> test utils need to manually add the hadoop minikdc dependency + the plugin.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568798#comment-15568798
 ] 

ASF GitHub Bot commented on FLINK-2608:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2623
  
As far as I read it, Kryo 3.x is not strictly serialization compatible with 
2.x, hence the major version number bump.

If the interfaces are still stable, then it should be fine to bump the 
chill dependency version, exclude any kryo dependency, and add our own 2.x kryo 
dependency. I would prefer that approach.


> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> 

[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568544#comment-15568544
 ] 

ASF GitHub Bot commented on FLINK-4691:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82991170
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 ---
@@ -23,6 +23,8 @@ import org.apache.flink.api.table.Row
 import java.math.BigDecimal
 import java.math.BigInteger
 
+import org.apache.flink.streaming.api.windowing.windows.Window
--- End diff --

Done.


> Add group-windows for streaming tables
> ---
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82993938
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base class for reading a window property. The property will be 
extracted once and
+  * can be read multiple times.
+  */
+trait PropertyRead[T] extends Serializable {
+
+  def extract(window: Window): Unit
--- End diff --

In general I agree to your solution, but right now we support to have the 
same operation multiple times in a query. E.g. 

```
.window(Session withGap 3.milli on 'rowtime as 'w)
.select('string, 'w.end, 'w.end)
```

Your code would fail. Let's leave the aggregation as it is for now and 
rework it again later. The aggregations have to be reworked anyway for 
efficiency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4450) update storm version to 1.0.0

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568782#comment-15568782
 ] 

ASF GitHub Bot commented on FLINK-4450:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2439
  
Okay, then we should re-add all the dependency exclusions that were removed 
from the `flink-storm/pom.xml`.


> update storm version to 1.0.0
> -
>
> Key: FLINK-4450
> URL: https://issues.apache.org/jira/browse/FLINK-4450
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Reporter: yuzhongliu
> Fix For: 2.0.0
>
>
> The storm package path was changed in new version
> storm old version package:
> backtype.storm.*
> storm new version pachage:
> org.apache.storm.*
> shall we update flink/flink-storm code to new storm version?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2623: [FLINK-2608] Updated Twitter Chill version.

2016-10-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2623
  
As far as I read it, Kryo 3.x is not strictly serialization compatible with 
2.x, hence the major version number bump.

If the interfaces are still stable, then it should be fine to bump the 
chill dependency version, exclude any kryo dependency, and add our own 2.x kryo 
dependency. I would prefer that approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568532#comment-15568532
 ] 

ASF GitHub Bot commented on FLINK-4691:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82990268
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/properties.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.api.table.FlinkRelBuilder.NamedProperty
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationSuccess}
+
+abstract class Property(child: Expression) extends UnaryExpression {
+
+  override def toString = s"Property($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode =
+throw new UnsupportedOperationException("Property cannot be 
transformed to RexNode.")
+
+  override private[flink] def validateInput() =
+if (child.isInstanceOf[WindowReference]) {
--- End diff --

I just tried to keep the names short. Because the Scala line lengths are 
pretty strict.
Done.


> Add group-windows for streaming tables
> ---
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs

2016-10-12 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568564#comment-15568564
 ] 

Robert Metzger commented on FLINK-3398:
---

Maybe we should close this JIRA and have the discussion only in FLINK-4280.

> Flink Kafka consumer should support auto-commit opt-outs
> 
>
> Key: FLINK-3398
> URL: https://issues.apache.org/jira/browse/FLINK-3398
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Shikhar Bhushan
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.0
>
>
> Currently the Kafka source will commit consumer offsets to Zookeeper, either 
> upon a checkpoint if checkpointing is enabled, otherwise periodically based 
> on {{auto.commit.interval.ms}}
> It should be possible to opt-out of committing consumer offsets to Zookeeper. 
> Kafka has this config as {{auto.commit.enable}} (0.8) and 
> {{enable.auto.commit}} (0.9).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4604) Add support for standard deviation/variance

2016-10-12 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568664#comment-15568664
 ] 

Anton Mushin commented on FLINK-4604:
-

I tried check function in 
{{org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule#matches}}, 
but something went wrong :)
I did so
{code:title=org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule}
override def matches(call: RelOptRuleCall): Boolean = {
val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]

// check if we have distinct aggregates
val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
if (distinctAggs) {
  throw new TableException("DISTINCT aggregates are currently not 
supported.")
}

// check if we have grouping sets
val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != 
agg.getGroupSet
if (groupSets || agg.indicator) {
  throw new TableException("GROUPING SETS are currently not supported.")
}

(!distinctAggs && !groupSets && !agg.indicator) && 
!AggregateReduceFunctionsRule.INSTANCE.matches(call)
  }
{code}
And I got next plan and exception:
{noformat}
DataSetCalc(select=[CAST(/(-(CASE(=($f1, 0), null, $f0), /(*(CASE(=($f3, 0), 
null, $f2), CASE(=($f3, 0), null, $f2)), $f3)), CASE(=($f3, 1), null, -($f3, 
1 AS $f0, CAST(/(-(CASE(=($f5, 0), null, $f4), /(*(CASE(=($f7, 0), null, 
$f6), CASE(=($f7, 0), null, $f6)), $f7)), CASE(=($f7, 1), null, -($f7, 1 AS 
$f1, CAST(/(-(CASE(=($f9, 0), null, $f8), /(*(CASE(=($f11, 0), null, $f10), 
CASE(=($f11, 0), null, $f10)), $f11)), CASE(=($f11, 1), null, -($f11, 1 AS 
$f2, CAST(/(-(CASE(=($f13, 0), null, $f12), /(*(CASE(=($f15, 0), null, $f14), 
CASE(=($f15, 0), null, $f14)), $f15)), CASE(=($f15, 1), null, -($f15, 1 AS 
$f3, CAST(/(-(CASE(=($f17, 0), null, $f16), /(*(CASE(=($f19, 0), null, $f18), 
CASE(=($f19, 0), null, $f18)), $f19)), CASE(=($f19, 1), null, -($f19, 1 AS 
$f4, CAST(/(-(CASE(=($f21, 0), null, $f20), /(*(CASE(=($f23, 0), null, $f22), 
CASE(=($f23, 0), null, $f22)), $f23)), CASE(=($f23, 1), null, -($f23, 1 AS 
$f5])
  DataSetAggregate(select=[$SUM0($f6) AS $f0, COUNT($f6) AS $f1, $SUM0(_1) AS 
$f2, COUNT(_1) AS $f3, $SUM0($f7) AS $f4, COUNT($f7) AS $f5, $SUM0(_2) AS $f6, 
COUNT(_2) AS $f7, $SUM0($f8) AS $f8, COUNT($f8) AS $f9, $SUM0(_3) AS $f10, 
COUNT(_3) AS $f11, $SUM0($f9) AS $f12, COUNT($f9) AS $f13, $SUM0(_4) AS $f14, 
COUNT(_4) AS $f15, $SUM0($f10) AS $f16, COUNT($f10) AS $f17, $SUM0(_5) AS $f18, 
COUNT(_5) AS $f19, $SUM0($f11) AS $f20, COUNT($f11) AS $f21, $SUM0(_6) AS $f22, 
COUNT(_6) AS $f23])
DataSetCalc(select=[_1, _2, _3, _4, _5, _6])
  DataSetScan(table=[[_DataSetTable_0]])
{noformat}
{noformat}
org.apache.flink.api.table.TableException: Type NULL is not supported. Null 
values must have a supported type.

at 
org.apache.flink.api.table.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:128)
at 
org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:553)
at 
org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:56)
at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:658)
at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 

[jira] [Updated] (FLINK-3037) Make the behavior of the Kafka consumer configurable if the offsets to restore from are not available

2016-10-12 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-3037:
--
Description: 
Currently, if the {{FlinkKafkaConsumer}} is restoring a checkpoint and the 
offset is not available anymore in Kafka, its restoring according to 
{{auto.offset.reset}}.
This leads to inconsistent behavior (not exactly-once anymore) because the 
operators will not receive data in sync with the checkpoint.

With this pull request, I would like to make the behavior controllable, using a 
flag. The simplest approach would be to let the consumer fail in that case.

  was:
Currently, if the {{FlinkKafkaConsumer}} is restoring a checkpoint and the 
offset is not available anymore in Kafka, its restoring according to 
{{auto.offset.reset}}.
This leads to inconsistent behavior (not exactly-once anymore) because the 
operators will not receive data in sync with the checkpoint.

With this pull request, I would like to make the behavior controllable, using a 
flag.


> Make the behavior of the Kafka consumer configurable if the offsets to 
> restore from are not available
> -
>
> Key: FLINK-3037
> URL: https://issues.apache.org/jira/browse/FLINK-3037
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> Currently, if the {{FlinkKafkaConsumer}} is restoring a checkpoint and the 
> offset is not available anymore in Kafka, its restoring according to 
> {{auto.offset.reset}}.
> This leads to inconsistent behavior (not exactly-once anymore) because the 
> operators will not receive data in sync with the checkpoint.
> With this pull request, I would like to make the behavior controllable, using 
> a flag. The simplest approach would be to let the consumer fail in that case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2016-10-12 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r83003348
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -345,16 +343,19 @@ protected static void 
validateZooKeeperConfig(Properties props) {
}
}
 
-   private static long getInvalidOffsetBehavior(Properties config) {
+   /**
+* Check for invalid "auto.offset.reset" values. Should be called in 
constructor for eager checking before submitting
+* the job. Note that 'none' is also considered invalid, as we don't 
want to deliberately throw an exception
--- End diff --

Thank you for the pointer.
We are discussing this issue here 
https://issues.apache.org/jira/browse/FLINK-4280 and here 
https://issues.apache.org/jira/browse/FLINK-3037


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r83008188
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{LongComparator, 
LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, 
TypeSerializer}
+import 
org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo[T](
--- End diff --

That's a good idea. I will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r83009444
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base class for reading a window property. The property will be 
extracted once and
+  * can be read multiple times.
+  */
+trait PropertyRead[T] extends Serializable {
+
+  def extract(window: Window): Unit
--- End diff --

Yes, every aggregation should only happen once. We should definitely do 
that. Btw. we can also get rid of `AvgAggregate` once 
`AggregateReduceFunctionsRule` is enabled. So there are many open issues with 
the current aggregate implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2608) Arrays.asList(..) does not work with CollectionInputFormat

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568745#comment-15568745
 ] 

ASF GitHub Bot commented on FLINK-2608:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2623
  
This change adds Kryo-shaded to our dependency tree:

```
[INFO] |  +- com.twitter:chill_2.10:jar:0.8.1:compile
[INFO] |  |  +- com.twitter:chill-java:jar:0.8.1:compile
[INFO] |  |  \- com.esotericsoftware:kryo-shaded:jar:3.0.3:compile
[INFO] |  | \- com.esotericsoftware:minlog:jar:1.3.0:compile
```
I suspect that maven is not recognizing this because chill seems to depend 
on `kryo-shaded`.
Apparently, `kryo-shaded` has a shaded ASM version included, but it is not 
relocating the regular Kryo classes. So we'll end up having two Kryo versions 
in our classpath.
So if we want to upgrade Kryo, we need to do it explicitly, to avoid having 
two Kryo versions in our classpath.

Another issue we need to consider is the serialization compatibility. 
Savepoints in Flink could potentially contain data serialized with Kryo 2.24. 
If we want to provide savepoint compatibility between Flink 1.1 and 1.2, we 
need to consider that.
According to the Kryo documentation, 2.24 to 3.0.0 is serialization 
compatible (I hope the same holds true for chill): 
https://github.com/EsotericSoftware/kryo/blob/master/CHANGES.md#2240---300-2014-10-04
I would like to hear @StephanEwen and @uce's opinion on this.


> Arrays.asList(..) does not work with CollectionInputFormat
> --
>
> Key: FLINK-2608
> URL: https://issues.apache.org/jira/browse/FLINK-2608
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 0.9, 0.10.0
>Reporter: Maximilian Michels
>Priority: Minor
> Fix For: 1.0.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet text = env.fromElements(
> "Who's there?",
> "I think I hear them. Stand, ho! Who's there?");
> // DOES NOT WORK
> List elements = Arrays.asList(0, 0, 0);
> // The following works:
> //List elements = new ArrayList<>(new int[] {0,0,0});
> DataSet set = env.fromElements(new TestClass(elements));
> DataSet> wordCounts = text
> .flatMap(new LineSplitter())
> .withBroadcastSet(set, "set")
> .groupBy(0)
> .sum(1);
> wordCounts.print();
> }
> public static class LineSplitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String line, Collector Integer>> out) {
> for (String word : line.split(" ")) {
> out.collect(new Tuple2(word, 1));
> }
> }
> }
> public static class TestClass implements Serializable {
> private static final long serialVersionUID = -2932037991574118651L;
> List integerList;
> public TestClass(List integerList){
> this.integerList=integerList;
> }
> }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> at 
> 

[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568768#comment-15568768
 ] 

ASF GitHub Bot commented on FLINK-4691:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r83008188
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{LongComparator, 
LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, 
TypeSerializer}
+import 
org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo[T](
--- End diff --

That's a good idea. I will do that.


> Add group-windows for streaming tables
> ---
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568789#comment-15568789
 ] 

ASF GitHub Bot commented on FLINK-4691:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r83009444
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base class for reading a window property. The property will be 
extracted once and
+  * can be read multiple times.
+  */
+trait PropertyRead[T] extends Serializable {
+
+  def extract(window: Window): Unit
--- End diff --

Yes, every aggregation should only happen once. We should definitely do 
that. Btw. we can also get rid of `AvgAggregate` once 
`AggregateReduceFunctionsRule` is enabled. So there are many open issues with 
the current aggregate implementation.


> Add group-windows for streaming tables
> ---
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r83000595
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.windows;
+
+import org.apache.flink.api.table.SessionWindow;
+
+/**
+ * Helper class for creating a session window. Session windows are ideal 
for cases where the
+ * window boundaries need to adjust to the incoming data. In a session 
window it is possible to
+ * have windows that start at individual points in time for each key and 
that end once there has
+ * been a certain period of inactivity.
+ */
+public class Session {
--- End diff --

I see, that makes sense. 

I think we should we convert the Java classes into Scala classes because 
they are in the Scala source folder. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568669#comment-15568669
 ] 

ASF GitHub Bot commented on FLINK-4691:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r83000595
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.windows;
+
+import org.apache.flink.api.table.SessionWindow;
+
+/**
+ * Helper class for creating a session window. Session windows are ideal 
for cases where the
+ * window boundaries need to adjust to the incoming data. In a session 
window it is possible to
+ * have windows that start at individual points in time for each key and 
that end once there has
+ * been a certain period of inactivity.
+ */
+public class Session {
--- End diff --

I see, that makes sense. 

I think we should we convert the Java classes into Scala classes because 
they are in the Scala source folder. 


> Add group-windows for streaming tables
> ---
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4283) ExecutionGraphRestartTest fails

2016-10-12 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568819#comment-15568819
 ] 

Stephan Ewen commented on FLINK-4283:
-

Is this issue still relevant, or has this instability been resolved?
[~AlexanderShoshin] what is your approach to fix this?

> ExecutionGraphRestartTest fails
> ---
>
> Key: FLINK-4283
> URL: https://issues.apache.org/jira/browse/FLINK-4283
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
> Environment: Ubuntu 14.04
> W10
>Reporter: Chesnay Schepler
>Assignee: Alexander Shoshin
>  Labels: test-stability
>
> I encounter reliable failures for the following tests:
> testRestartAutomatically(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
>   Time elapsed: 120.089 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartAutomatically(ExecutionGraphRestartTest.java:155)
> taskShouldNotFailWhenFailureRateLimitWasNotExceeded(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
>   Time elapsed: 2.055 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.taskShouldNotFailWhenFailureRateLimitWasNotExceeded(ExecutionGraphRestartTest.java:180)
> testFailingExecutionAfterRestart(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest)
>   Time elapsed: 120.079 sec  <<< FAILURE!
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:743)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testFailingExecutionAfterRestart(ExecutionGraphRestartTest.java:397)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4806) ResourceManager stop listening JobManager's leader address

2016-10-12 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568527#comment-15568527
 ] 

Kurt Young commented on FLINK-4806:
---

Yes, you are right about this. [~mxm]

> ResourceManager stop listening JobManager's leader address
> --
>
> Key: FLINK-4806
> URL: https://issues.apache.org/jira/browse/FLINK-4806
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>
> Currently in flip-6 branch, when RM receives a registration from JM, it will 
> verify the leader session id of JM and attach a JobManagerLeaderListener with 
> it for monitoring the future changes. 
> Maybe we can simplify it a little bit. We don't monitor the leadership change 
> of the JM, after the verification passed when JM registered itself, we simply 
> write down the leader id of the registered the JM for future rpc filtering, 
> and start heartbeat monitor with JM. 
> If JM's leadership has been changed, the new JM will register itself, and RM 
> will verify its leadership when received registration, and RM can decide 
> whether accept or reject the registration. It's kind of like JM's information 
> in RM is preempted only by new JM but not by RM itself with leadership change 
> listener. By doing this, we can simplify the logic inside RM and don't have 
> to do any error handling with leader listener. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82991170
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 ---
@@ -23,6 +23,8 @@ import org.apache.flink.api.table.Row
 import java.math.BigDecimal
 import java.math.BigInteger
 
+import org.apache.flink.streaming.api.windowing.windows.Window
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4691) Add group-windows for streaming tables

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568586#comment-15568586
 ] 

ASF GitHub Bot commented on FLINK-4691:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82993938
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base class for reading a window property. The property will be 
extracted once and
+  * can be read multiple times.
+  */
+trait PropertyRead[T] extends Serializable {
+
+  def extract(window: Window): Unit
--- End diff --

In general I agree to your solution, but right now we support to have the 
same operation multiple times in a query. E.g. 

```
.window(Session withGap 3.milli on 'rowtime as 'w)
.select('string, 'w.end, 'w.end)
```

Your code would fail. Let's leave the aggregation as it is for now and 
rework it again later. The aggregations have to be reworked anyway for 
efficiency.


> Add group-windows for streaming tables
> ---
>
> Key: FLINK-4691
> URL: https://issues.apache.org/jira/browse/FLINK-4691
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Add Tumble, Slide, Session group-windows for streaming tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  
> Implementation of group-windows on streaming tables. This includes 
> implementing the API of group-windows, the logical validation for 
> group-windows, and the definition of the “rowtime” and “systemtime” keywords. 
> Group-windows on batch tables won’t be initially supported and will throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4813) Having flink-test-utils as a dependency outside Flink fails the build

2016-10-12 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4813:
--
Description: 
The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a dependency, 
which is only resolvable, if the {{maven-bundle-plugin}} is loaded.

This is the error message
{code}
[ERROR] Failed to execute goal on project quickstart-1.2-tests: Could not 
resolve dependencies for project 
com.dataartisans:quickstart-1.2-tests:jar:1.0-SNAPSHOT: Failure to find 
org.apache.directory.jdbm:apacheds-jdbm1:bundle:2.0.0-M2 in 
https://repo.maven.apache.org/maven2 was cached in the local repository, 
resolution will not be reattempted until the update interval of central has 
elapsed or updates are forced -> [Help 1]
{code}

{{flink-parent}} loads that plugin, so all "internal" dependencies to the test 
utils can resolve the plugin.
Right now, users have to use the maven bundle plugin to use our test utils 
externally.

By making the hadoop minikdc dependency optional, we can probably resolve the 
issues. Then, only users who want to use the security-related tools in the test 
utils need to manually add the hadoop minikdc dependency + the plugin.

  was:
The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a dependency, 
which is only resolvable, if the {{maven-bundle-plugin}} is loaded.

{{flink-parent}} loads that plugin, so all "internal" dependencies to the test 
utils can resolve the plugin.
Right now, users have to use the maven bundle plugin to use our test utils 
externally.

By making the hadoop minikdc dependency optional, we can probably resolve the 
issues. Then, only users who want to use the security-related tools in the test 
utils need to manually add the hadoop minikdc dependency + the plugin.


> Having flink-test-utils as a dependency outside Flink fails the build
> -
>
> Key: FLINK-4813
> URL: https://issues.apache.org/jira/browse/FLINK-4813
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>
> The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a 
> dependency, which is only resolvable, if the {{maven-bundle-plugin}} is 
> loaded.
> This is the error message
> {code}
> [ERROR] Failed to execute goal on project quickstart-1.2-tests: Could not 
> resolve dependencies for project 
> com.dataartisans:quickstart-1.2-tests:jar:1.0-SNAPSHOT: Failure to find 
> org.apache.directory.jdbm:apacheds-jdbm1:bundle:2.0.0-M2 in 
> https://repo.maven.apache.org/maven2 was cached in the local repository, 
> resolution will not be reattempted until the update interval of central has 
> elapsed or updates are forced -> [Help 1]
> {code}
> {{flink-parent}} loads that plugin, so all "internal" dependencies to the 
> test utils can resolve the plugin.
> Right now, users have to use the maven bundle plugin to use our test utils 
> externally.
> By making the hadoop minikdc dependency optional, we can probably resolve the 
> issues. Then, only users who want to use the security-related tools in the 
> test utils need to manually add the hadoop minikdc dependency + the plugin.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-10-12 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568701#comment-15568701
 ] 

Robert Metzger commented on FLINK-4280:
---

This issue is related: https://issues.apache.org/jira/browse/FLINK-3037


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568703#comment-15568703
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r83003348
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -345,16 +343,19 @@ protected static void 
validateZooKeeperConfig(Properties props) {
}
}
 
-   private static long getInvalidOffsetBehavior(Properties config) {
+   /**
+* Check for invalid "auto.offset.reset" values. Should be called in 
constructor for eager checking before submitting
+* the job. Note that 'none' is also considered invalid, as we don't 
want to deliberately throw an exception
--- End diff --

Thank you for the pointer.
We are discussing this issue here 
https://issues.apache.org/jira/browse/FLINK-4280 and here 
https://issues.apache.org/jira/browse/FLINK-3037


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2623: [FLINK-2608] Updated Twitter Chill version.

2016-10-12 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2623
  
This change adds Kryo-shaded to our dependency tree:

```
[INFO] |  +- com.twitter:chill_2.10:jar:0.8.1:compile
[INFO] |  |  +- com.twitter:chill-java:jar:0.8.1:compile
[INFO] |  |  \- com.esotericsoftware:kryo-shaded:jar:3.0.3:compile
[INFO] |  | \- com.esotericsoftware:minlog:jar:1.3.0:compile
```
I suspect that maven is not recognizing this because chill seems to depend 
on `kryo-shaded`.
Apparently, `kryo-shaded` has a shaded ASM version included, but it is not 
relocating the regular Kryo classes. So we'll end up having two Kryo versions 
in our classpath.
So if we want to upgrade Kryo, we need to do it explicitly, to avoid having 
two Kryo versions in our classpath.

Another issue we need to consider is the serialization compatibility. 
Savepoints in Flink could potentially contain data serialized with Kryo 2.24. 
If we want to provide savepoint compatibility between Flink 1.1 and 1.2, we 
need to consider that.
According to the Kryo documentation, 2.24 to 3.0.0 is serialization 
compatible (I hope the same holds true for chill): 
https://github.com/EsotericSoftware/kryo/blob/master/CHANGES.md#2240---300-2014-10-04
I would like to hear @StephanEwen and @uce's opinion on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4813) Having flink-test-utils as a dependency outside Flink fails the build

2016-10-12 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568803#comment-15568803
 ] 

Stephan Ewen commented on FLINK-4813:
-

+1

> Having flink-test-utils as a dependency outside Flink fails the build
> -
>
> Key: FLINK-4813
> URL: https://issues.apache.org/jira/browse/FLINK-4813
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>
> The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a 
> dependency, which is only resolvable, if the {{maven-bundle-plugin}} is 
> loaded.
> This is the error message
> {code}
> [ERROR] Failed to execute goal on project quickstart-1.2-tests: Could not 
> resolve dependencies for project 
> com.dataartisans:quickstart-1.2-tests:jar:1.0-SNAPSHOT: Failure to find 
> org.apache.directory.jdbm:apacheds-jdbm1:bundle:2.0.0-M2 in 
> https://repo.maven.apache.org/maven2 was cached in the local repository, 
> resolution will not be reattempted until the update interval of central has 
> elapsed or updates are forced -> [Help 1]
> {code}
> {{flink-parent}} loads that plugin, so all "internal" dependencies to the 
> test utils can resolve the plugin.
> Right now, users have to use the maven bundle plugin to use our test utils 
> externally.
> By making the hadoop minikdc dependency optional, we can probably resolve the 
> issues. Then, only users who want to use the security-related tools in the 
> test utils need to manually add the hadoop minikdc dependency + the plugin.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4717) Naive version of atomic stop signal with savepoint

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568548#comment-15568548
 ] 

ASF GitHub Bot commented on FLINK-4717:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/2609
  
Addressed the issue and found another bug in #2608 that I've fixed in 
f769e8e. If Travis gives the green light, I will rebase on #2608 and merge this 
later today.


> Naive version of atomic stop signal with savepoint
> --
>
> Key: FLINK-4717
> URL: https://issues.apache.org/jira/browse/FLINK-4717
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Priority: Minor
> Fix For: 1.2.0
>
>
> As a first step towards atomic stopping with savepoints we should implement a 
> cancel command which prior to cancelling takes a savepoint. Additionally, it 
> should turn off the periodic checkpointing so that there won't be checkpoints 
> executed between the savepoint and the cancel command.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2626: [FLINK-4787] [runtime-web] Expose cancel-with-save...

2016-10-12 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2626

[FLINK-4787] [runtime-web] Expose cancel-with-savepoint via REST API

Follow up to #2609, exposing the cancel-with-savepoint command via the REST 
API. The relevant commits are the last two ones.

The `RequestHandler` now returns a generic `HttpResponse` instead of a 
`String`. This enables handlers to return custom reponses (different http 
codes, etc.). Now most handlers extend thee `AbstractJsonRequestHandler` for 
default JSON responses (which used to be handled by the generic 
`RequestHandler`).

Adds handlers for triggering and monitoring a job cancellation with 
savepoints. Since this operation can take some time, we do this asynchronously. 
According to various online resources, the way to go for REST APIs in such 
cases is to return HTTP 201 accepted with the location of the in-progress 
operation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 4787-cancel_with_savepoint_rest_api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2626.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2626


commit 99a9621383e6a223e39e4ec22d60671a205d958d
Author: Ufuk Celebi 
Date:   2016-10-06T14:43:42Z

[FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints

[FLINK-4509] [FLIP-10] Specify savepoint directory per savepoint
[FLINK-4507] [FLIP-10] Deprecate savepoint backend config

commit 25ffc04d7e5d7ef9538447ee5162c0d203e96e89
Author: Ufuk Celebi 
Date:   2016-10-07T09:48:47Z

[FLINK-4717] Add CancelJobWithSavepoint

- Adds CancelJobWithSavepoint message, which triggers a savepoint
  before cancelling the respective job.
- Adds -s [targetDirectory] option to CLI cancel command:
* bin/flink cancel  (regular cancelling)
* bin/flink cancel -s  (cancel with savepoint to default dir)
* bin/flink cancek -s   (cancel with savepoint to 
targetDir)

commit bc88dba90a263e80691448e20644e5f126551bb6
Author: Ufuk Celebi 
Date:   2016-10-11T08:08:14Z

[FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler

- Let RequestHandler return a generic HttpResponse instead of a String. This
  enables handlers to return custom reponses (differnt http codes, etc.)
- Introduce AbstractJsonRequestHandler for default JSON responses

commit ecbcf46f5a9d874dbdd908d48c7035c1cb338c1a
Author: Ufuk Celebi 
Date:   2016-10-11T08:09:20Z

[FLINK-4787] [runtime-web] Add JobCancellationWithSavepointHandlers

- Add handlers for triggering and monitoring job cancellation with
  savepoints.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3706) YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable

2016-10-12 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568635#comment-15568635
 ] 

Robert Metzger commented on FLINK-3706:
---

Fix for release-1.1: 
http://git-wip-us.apache.org/repos/asf/flink/commit/c9433bf6

> YARNSessionCapacitySchedulerITCase.testNonexistingQueue unstable
> 
>
> Key: FLINK-3706
> URL: https://issues.apache.org/jira/browse/FLINK-3706
> Project: Flink
>  Issue Type: Bug
>  Components: YARN Client
>Reporter: Aljoscha Krettek
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.2.0
>
> Attachments: log.txt
>
>
> I encountered a failed test on travis.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4813) Having flink-test-utils as a dependency outside Flink fails the build

2016-10-12 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4813:
-

 Summary: Having flink-test-utils as a dependency outside Flink 
fails the build
 Key: FLINK-4813
 URL: https://issues.apache.org/jira/browse/FLINK-4813
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.2.0
Reporter: Robert Metzger


The {{flink-test-utils}} depend on {{hadoop-minikdc}}, which has a dependency, 
which is only resolvable, if the {{maven-bundle-plugin}} is loaded.

{{flink-parent}} loads that plugin, so all "internal" dependencies to the test 
utils can resolve the plugin.
Right now, users have to use the maven bundle plugin to use our test utils 
externally.

By making the hadoop minikdc dependency optional, we can probably resolve the 
issues. Then, only users who want to use the security-related tools in the test 
utils need to manually add the hadoop minikdc dependency + the plugin.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2627: Kafka 0.10 follow-up fixes

2016-10-12 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2627
  
Thanks a lot for fixing this.
+1 from my side


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4669) scala api createLocalEnvironment() function add default Configuration parameter

2016-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15567867#comment-15567867
 ] 

ASF GitHub Bot commented on FLINK-4669:
---

Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2541
  
@StephanEwen  I‘ve a new function name `createCustomLocalEnv`. Sorry for 
late ack.


> scala api createLocalEnvironment() function add default Configuration 
> parameter
> ---
>
> Key: FLINK-4669
> URL: https://issues.apache.org/jira/browse/FLINK-4669
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: shijinkui
>
> scala program can't direct use createLocalEnvironment with custom Configure 
> object.
> such as I want to start web server in local mode, I will do such as:
> ```
> // set up execution environment
> val conf = new Configuration
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
> ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)
> val env = new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(
>   
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironment(2,
>  conf)
> )
> ```
> so we need createLocalEnvironment function have a config parameter 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2541: [FLINK-4669] scala api createLocalEnvironment() function ...

2016-10-12 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2541
  
@StephanEwen  I‘ve a new function name `createCustomLocalEnv`. Sorry for 
late ack.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4814) Remove extra storage location for externalized checkpoint metadata

2016-10-12 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-4814:
--

 Summary: Remove extra storage location for externalized checkpoint 
metadata
 Key: FLINK-4814
 URL: https://issues.apache.org/jira/browse/FLINK-4814
 Project: Flink
  Issue Type: Improvement
Reporter: Ufuk Celebi


Follow up for FLINK-4512.

Store checkpoint meta data in checkpoint directory.  That makes it simpler for 
users to track and clean up checkpoints manually, if they want to retain 
externalized checkpoints across cancellations and terminal failures.

Every state backend needs to be able to provide a storage location for the 
checkpoint metadata. The memory state backend would hence not work with 
externalized checkpoints, unless one sets explicitly a parameter 
`setExternalizedCheckpointsLocation(uri)`.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4816) Executions from "DEPLOYING" should retain restored checkpoint information

2016-10-12 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4816:
---

 Summary: Executions from "DEPLOYING" should retain restored 
checkpoint information
 Key: FLINK-4816
 URL: https://issues.apache.org/jira/browse/FLINK-4816
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Reporter: Stephan Ewen


When an execution fails from state {{DEPLOYING}}, it should wrap the failure to 
better report the failure cause:
  - If no checkpoint was restored, it should wrap the exception in a 
{{DeployTaskException}}
  - If a checkpoint was restored, it should wrap the exception in a 
{{RestoreTaskException}} and record the id of the checkpoint that was attempted 
to be restored.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3844) Checkpoint failures should not always lead to job failures

2016-10-12 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-3844.
---

> Checkpoint failures should not always lead to job failures
> --
>
> Key: FLINK-3844
> URL: https://issues.apache.org/jira/browse/FLINK-3844
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Gyula Fora
>
> Currently when a checkpoint fails the job crashes immediately. This is not 
> the desired behaviour in many cases. It would probably be better to log the 
> failed checkpoint attempt and only fail the job after so many subsequent 
> failed attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3844) Checkpoint failures should not always lead to job failures

2016-10-12 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-3844.
-
Resolution: Duplicate

Duplicate of FLINK-4809

The other issue has already fine grained subtasks and a more detailed 
description

> Checkpoint failures should not always lead to job failures
> --
>
> Key: FLINK-3844
> URL: https://issues.apache.org/jira/browse/FLINK-3844
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Gyula Fora
>
> Currently when a checkpoint fails the job crashes immediately. This is not 
> the desired behaviour in many cases. It would probably be better to log the 
> failed checkpoint attempt and only fail the job after so many subsequent 
> failed attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3594) StreamTask may fail when checkpoint is concurrent to regular termination

2016-10-12 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-3594.
---

> StreamTask may fail when checkpoint is concurrent to regular termination
> 
>
> Key: FLINK-3594
> URL: https://issues.apache.org/jira/browse/FLINK-3594
> Project: Flink
>  Issue Type: Bug
>Reporter: Chesnay Schepler
>Assignee: Stephan Ewen
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.1.0
>
>
> Some tests in the KafkaConsumerTestBase rely on throwing a SuccessException 
> to stop the streaming job if the test condition is fulfilled.
> The job then fails, and it is checked whether the cause was a 
> SuccessException. if so, the test is marked as a success, otherwise as a 
> failure.
> However, should this exception be thrown while a checkpoint is being 
> triggered, the exception that stop the job is not the SuccessException, but a 
> CancelTaskException.
> This should affect every test that uses the SuccessException.
> observed here: https://travis-ci.org/apache/flink/jobs/114523189
> The problem is that the exception causes the StreamTask to enter the finally 
> block inside invoke(), which sets isRunning to false. Within 
> triggerCheckpoint() isRunning is then checked for being false, and if so a 
> CancelTaskException is thrown.
> This seems like an issue of the runtime; i observed other tests failing, 
> without giving a good cause since the CancelTaskException masks it.
> I was wondering whether triggerCheckpoint() could return false instead of 
> throwing an exception, and simply assume that an exception will be thrown 
> within invoke().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


<    1   2   3   >