[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/742


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-04 Thread mbalassi
Github user mbalassi commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-108765546
  
Big +1 for the `run(SourceContext)` interface and experimenting with 
`Thread.holdslock(obj).`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-04 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-108740723
  
I will run some benchmarks, then we can decide about that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-03 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-108615858
  
I thought about what @StephanEwen  said about uncheckpointed sources also 
having the locking object in the signature of the run() method and also about 
extensibility.

We might have to tweak the source interface a little bit more. What I 
propose is to have this run method:
```
void run(SourceContext context);
```

Then the source context would have methods to retrieve the locking object 
(for checkpointed sources) and for emitting elements. Part of my motivation for 
this is that this can be extended in the future without breaking existing 
sources. If we introduce proper timestamps at some point we can extend the 
SourceContext with a method for emitting elements with a timestamp. Then, if we 
want to have watermarks the context can have methods for activating 
automatically generated watermarks and for emitting watermarks. And so on...

I think we should fix this now, before the release. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-03 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-108619295
  
Yes, the change can basically be done by a regex so I also propose merging 
this as early as possible now.

By the way, we could ensure that the source is actually holding the monitor 
lock with `Thread.holdsLock(obj)`. Not sure about the performance impact, 
though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-03 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-108618236
  
How about we still merge this now, to make sure we have a good version in 
to start testing? The change you propose is API only, and would not affect 
internals/timings...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-03 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-108627853
  
I was thinking the same thing, about `Thread.holdsLock(obj)`. That call 
probably costs way more than the lock itself, though. Would be nice to have a 
debug mode, to activate it there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-03 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-108618014
  
I generally like that idea. Especially the extensibility with respect to 
timestamps and watermark generation is a good point.

Retrieving the lock object from the context is not very obvious, but then 
again, someone who implements a fault tolerant exactly-once source should ready 
the javadocs and have a look at an example.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-02 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-107860292
  
I addressed the problem with the race conditions and re-enabled the twitter 
source.

Which exceptions are you referring to? I don't think I touched any of the 
exception handling or the general way that the steam tasks work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-01 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-107388477
  
As per the discussion on the mailing list I'm rewriting the Source 
interface to only have the run()/cancel() variant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-107613541
  
I'll make a pass


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-01 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-107589378
  
I reworked the sources now. Could someone please have another pass over 
this. I think this is very critical code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-01 Thread mbalassi
Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/742#discussion_r31461903
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
 ---
@@ -1,322 +1,322 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.twitter.hbc.ClientBuilder;
-import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import com.twitter.hbc.httpclient.BasicClient;
-import com.twitter.hbc.httpclient.auth.Authentication;
-import com.twitter.hbc.httpclient.auth.OAuth1;
-
-/**
- * Implementation of {@link SourceFunction} specialized to emit tweets from
- * Twitter. It can connect to Twitter Streaming API, collect tweets and
- */
-public class TwitterSource extends RichParallelSourceFunctionString {
-
-   private static final Logger LOG = 
LoggerFactory.getLogger(TwitterSource.class);
-
-   private static final long serialVersionUID = 1L;
-   private String authPath;
-   private transient BlockingQueueString queue;
-   private int queueSize = 1;
-   private transient BasicClient client;
-   private int waitSec = 5;
-
-   private int maxNumberOfTweets;
-   private int currentNumberOfTweets;
-
-   private String nextElement = null;
-
-   private volatile boolean isRunning = false;
-
-   /**
-* Create {@link TwitterSource} for streaming
-* 
-* @param authPath
-*Location of the properties file containing the required
-*authentication information.
-*/
-   public TwitterSource(String authPath) {
-   this.authPath = authPath;
-   maxNumberOfTweets = -1;
-   }
-
-   /**
-* Create {@link TwitterSource} to collect finite number of tweets
-* 
-* @param authPath
-*Location of the properties file containing the required
-*authentication information.
-* @param numberOfTweets
-* 
-*/
-   public TwitterSource(String authPath, int numberOfTweets) {
-   this.authPath = authPath;
-   this.maxNumberOfTweets = numberOfTweets;
-   }
-
-   @Override
-   public void open(Configuration parameters) throws Exception {
-   initializeConnection();
-   currentNumberOfTweets = 0;
-   }
-
-   /**
-* Initialize Hosebird Client to be able to consume Twitter's Streaming 
API
-*/
-   private void initializeConnection() {
-
-   if (LOG.isInfoEnabled()) {
-   LOG.info(Initializing Twitter Streaming API 
connection);
-   }
-
-   queue = new LinkedBlockingQueueString(queueSize);
-
-   StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
-   endpoint.stallWarnings(false);
-
-   Authentication auth = authenticate();
-
-   initializeClient(endpoint, auth);
-
-   if (LOG.isInfoEnabled()) {
-   LOG.info(Twitter Streaming API connection established 
successfully);
-   }
-   }
-
-   private OAuth1 authenticate() 

[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-01 Thread mbalassi
Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/742#discussion_r31460859
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
 ---
@@ -61,10 +63,14 @@
  *
  * Note that the autocommit feature of Kafka needs to be disabled for 
using this source.
  */
-public class PersistentKafkaSourceOUT extends 
RichParallelSourceFunctionOUT implements
+public class PersistentKafkaSourceOUT extends RichSourceFunctionOUT 
implements
ResultTypeQueryableOUT,
CheckpointCommitter,
+   ParallelSourceFunctionOUT,
CheckpointedAsynchronouslylong[] {
--- End diff --

Please have a RichParallelSourceFunction instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-01 Thread mbalassi
Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/742#discussion_r31460503
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 ---
@@ -19,66 +19,81 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
 /**
  * Base interface for all stream data sources in Flink. The contract of a 
stream source
- * is similar to an iterator - it is consumed as in the following pseudo 
code:
--- End diff --

Maybe streaming data sources, instead of stream data sources. Not to 
confuse with file streams are intermediate results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-107751723
  
Looks good all in all. I am preparing a followup pull request that cleans 
up a few things, adds comments, and addresses Marton's comments.

One thing I noticed is that all the non-checkpointed sources have a 
checkpoint lock in the signature as well.

Should we offer two source interfaces: `SourceFunction` and 
`CheckpointedSourceFunction` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-01 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-107768981
  
Some important comments:

  - Exceptions should always propagate, and exceptions during cancelling 
can be thrown. The `Task` class filters out exceptions that come after the 
transition to the canceling state. Don't try to be super smart there, 
propagate your exceptions, and the context will decide whether they should be 
logged.

  - The RabbitMQ source is not doing any proper failure handling 
(critical!). I opened a separate JIRA issue for that.

  - Just commenting out the twitter sources is bad style, in my opinion. 
There is actually no chance of getting this pull request in, and this one here 
is a release blocker, while the Twitter Source one is not a release blocker.

 - All functions set their `running` flag to true at the beginning of the 
`run()` method. In the case of races between invoking and canceling the source, 
it can mean that the flag is set to false in the cancel() method and then to 
true in the run() method, resulting in a non-canceled source. I fixed some 
cases in my follow up pull-request, but many are still in. Please make another 
careful pass with respect to that.

 - In general, the streaming sources are extremely badly commented. This 
needs big improvements!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-06-01 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-107811407
  
I will fix the remaining cancel() races. The twitter stuff I just commented 
out because I assumed that the new TwitterSource could be merged right away and 
I was waiting for that.

As for two Source interfaces. We can certainly do that. The reason I didn't 
do it is because there would be a lot of duplication because we have 
SourceFunction, ParallelSourceFunction, RichSourceFunction and 
ParallelRichSourceFunction. With the new Source interface this would go up to 8 
interfaces for the sources. (That's also the reason why I didn't have Kafka 
derived from the RichParallelSourceFunction, I thought that maybe we could get 
rid of the special interfaces for parallel sources.)

Also, I realize there are many more problems. I just can't address them all 
in a single PR. :sweat_smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-05-28 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/742

[FLINK-2098] Ensure checkpoints and element emission are in order

Before, it could happen that a streaming source would keep emitting
elements while a checkpoint is being performed. This can lead to
inconsistencies in the checkpoints with other operators.

This also adds a test that checks whether only one checkpoint is
executed at a time and the serial behaviour of checkpointing and
emission.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink checkpoint-hardening

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/742.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #742


commit 5df7410f44c640f3a183b525f697bbc11a69c69b
Author: Aljoscha Krettek aljoscha.kret...@gmail.com
Date:   2015-05-28T08:24:37Z

[FLINK-2098] Ensure checkpoints and element emission are in order

Before, it could happen that a streaming source would keep emitting
elements while a checkpoint is being performed. This can lead to
inconsistencies in the checkpoints with other operators.

This also adds a test that checks whether only one checkpoint is
executed at a time and the serial behaviour of checkpointing and
emission.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-05-28 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/742#issuecomment-106471093
  
This also enables the exactly once checkpointing test added earlier by 
@StephanEwen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2098] Ensure checkpoints and element em...

2015-05-28 Thread mbalassi
Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/742#discussion_r3125
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java
 ---
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
--- End diff --

Trivial: two licenses.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---