[jira] [Commented] (FLINK-4278) Unclosed FSDataOutputStream in multiple files in the project

2016-08-02 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405275#comment-15405275
 ] 

Ted Yu commented on FLINK-4278:
---

I was out of town.

IOUtils.closeQuietly() should be fine.

> Unclosed FSDataOutputStream in multiple files in the project
> 
>
> Key: FLINK-4278
> URL: https://issues.apache.org/jira/browse/FLINK-4278
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Neelesh Srinivas Salian
>Assignee: Neelesh Srinivas Salian
>
> After FLINK-4259, I did a check and found that the following files don't have 
> the closing of the FSDataOutputStream.
> The following are the files and the corresponding methods missing the close() 
> 1) FSDataOutputStream.java adding a close method in the abstract class
> 2) FSStateBackend flush() and write() - closing the FSDataOutputStream
> 3) StringWriter.java write()
> 4) FileSystemStateStore putState() -  closing the FSDataOutputStream
> 5) HadoopDataOutputStream.java not too sure if this needs closing.
> 6) FileSystemStateStorageHelper.java store() need closing for both outStream 
> and the ObjectOutputStream
> The options to think would be to either close or use  IOUtils.closeQuietly() 
> Any thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3

2016-08-02 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3801:
--
Description: 
Currently yoda-time 2.5 is used which was very old.

We should upgrade to 2.9.3

  was:
Currently yoda-time 2.5 is used which was very old.


We should upgrade to 2.9.3


> Upgrade Joda-Time library to 2.9.3
> --
>
> Key: FLINK-3801
> URL: https://issues.apache.org/jira/browse/FLINK-3801
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently yoda-time 2.5 is used which was very old.
> We should upgrade to 2.9.3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405206#comment-15405206
 ] 

ASF GitHub Bot commented on FLINK-3940:
---

Github user gallenvara commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73272805
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
+val partitionCount = partitionedDs.mapPartition(
+  new MapPartitionFunction[Any, Int] {
+override def mapPartition(value: lang.Iterable[Any], out: 
Collector[Int]): Unit = {
+  val iterator = value.iterator()
+  var elementCount = 0
+  while (iterator.hasNext) {
+elementCount += 1
+iterator -> iterator.next()
+  }
+  out.collect(elementCount)
+}
+  }).collect().asScala
--- End diff --

For the case where parallelism != 1, we use range partition and sort 
partition to get the global order. The number of elements in every 
partition(range) is not always same, so counting in partition is necessary 
here, IMO.


> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2282: [FLINK-3940] [table] Add support for ORDER BY OFFS...

2016-08-02 Thread gallenvara
Github user gallenvara commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73272805
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
+val partitionCount = partitionedDs.mapPartition(
+  new MapPartitionFunction[Any, Int] {
+override def mapPartition(value: lang.Iterable[Any], out: 
Collector[Int]): Unit = {
+  val iterator = value.iterator()
+  var elementCount = 0
+  while (iterator.hasNext) {
+elementCount += 1
+iterator -> iterator.next()
+  }
+  out.collect(elementCount)
+}
+  }).collect().asScala
--- End diff --

For the case where parallelism != 1, we use range partition and sort 
partition to get the global order. The number of elements in every 
partition(range) is not always same, so counting in partition is necessary 
here, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4224) Exception after successful execution os job submitted through the web interface.

2016-08-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-4224.
---
Resolution: Not A Problem

> Exception after successful execution os job submitted through the web 
> interface.
> 
>
> Key: FLINK-4224
> URL: https://issues.apache.org/jira/browse/FLINK-4224
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Kostas Kloudas
>Assignee: Aljoscha Krettek
>
> When submitting the job below through the web interface to a local flink 
> cluster:
> {code}
>   public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env =
>   SreamExecutionEnvironment.getExecutionEnvironment();
>   TextInputFormat inputFormat = 
>  new TextInputFormat(new 
> Path("file:///YOUR_TEXT_FILE"));
>   DataStream> input = env.
>   
> readFile(inputFormat,inputFormat.getFilePath().toString()).
>   flatMap(new Tokenizer()).
>   keyBy(0).
>   sum(1);
>   input.print();
>   env.execute("WebClient Testing Example.");
>   }
>   
> public static final class Tokenizer implements 
> FlatMapFunction> {
>   private static final long serialVersionUID = 1L;
>   @Override
>   public void flatMap(String value, Collector Integer>> out)
>   throws Exception {
>   // normalize and split the line
>   String[] tokens = value.toLowerCase().split("\\W+");
>   // emit the pairs
>   for (String token : tokens) {
>   if (token.length() > 0) {
>   out.collect(new Tuple2<>(token, 1));
>   }
>   }
>   }
>   }
> {code}
> The job succeeds, but at the logs there is a NPE: 
> {code} 2016-07-16 16:56:18,197 WARN  
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - 
>  Error while handling request java.lang.RuntimeException: 
>  Couldn't deserialize ExecutionConfig.
> ...
> Caused by: java.lang.NullPointerException at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:55)
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler.handleRequest(JobConfigHandler.java:50)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405156#comment-15405156
 ] 

ASF GitHub Bot commented on FLINK-4271:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2305
  
I agree with @aljoscha . Can we reduce the checker's sensitivity to pass 
this change ? How do we do when we need to break compatibility?


> There is no way to set parallelism of operators produced by CoGroupedStreams
> 
>
> Key: FLINK-4271
> URL: https://issues.apache.org/jira/browse/FLINK-4271
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Wenlong Lyu
>Assignee: Jark Wu
>
> Currently, CoGroupStreams package the map/keyBy/window operators with a human 
> friendly interface, like: 
> dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the 
> intermediate operators and final window operators can not be accessed by 
> users, and we cannot set attributes of the operators, which make co-group 
> hard to use in production environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2305: [FLINK-4271] [DataStreamAPI] Enable CoGroupedStreams and ...

2016-08-02 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2305
  
I agree with @aljoscha . Can we reduce the checker's sensitivity to pass 
this change ? How do we do when we need to break compatibility?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4309) Potential null pointer dereference in DelegatingConfiguration#keySet()

2016-08-02 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4309:
-

 Summary: Potential null pointer dereference in 
DelegatingConfiguration#keySet()
 Key: FLINK-4309
 URL: https://issues.apache.org/jira/browse/FLINK-4309
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
final int prefixLen = this.prefix == null ? 0 : this.prefix.length();

for (String key : this.backingConfig.keySet()) {
  if (key.startsWith(this.prefix)) {
{code}
If this.prefix == null, we would get NPE in startsWith():
{code}
public boolean startsWith(String prefix, int toffset) {
char ta[] = value;
int to = toffset;
char pa[] = prefix.value;
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405073#comment-15405073
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
@mxm - I have addressed most of the feedback and pushed the changes. Please 
take a look when you get a chance. 

Regarding the secure test cases, 
-  HDFS and Yarn are handled through the @BeforeClass and @AfterClass style 
and they do not use custom JRunner implementation. As you have suggested, I 
could keep just one or two tests for each of the modules to cut down the 
running time, if that's okay with you?
- Kafka tests are handled with custom JRunner and if we need to move it to 
@BeforeClass and @AfterClass, then we may have to duplicate the code used in 
the base classes which may not look good. For e.g., Have a look at 
Kafka09ProducerSecuredITCase which is extended from Kafka09ProducerITCase which 
has 2 level of parent classes (KafkaProducerTestBase & KafkaTestBase). If we 
write a separate test case for secure cluster, then I may have to duplicate 
some of these base class code which can be avoided if we use custom JRunner. We 
could still limit the number of test cases to just one or two to minimize the 
running time. Please let me know your thoughts?


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-08-02 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
@mxm - I have addressed most of the feedback and pushed the changes. Please 
take a look when you get a chance. 

Regarding the secure test cases, 
-  HDFS and Yarn are handled through the @BeforeClass and @AfterClass style 
and they do not use custom JRunner implementation. As you have suggested, I 
could keep just one or two tests for each of the modules to cut down the 
running time, if that's okay with you?
- Kafka tests are handled with custom JRunner and if we need to move it to 
@BeforeClass and @AfterClass, then we may have to duplicate the code used in 
the base classes which may not look good. For e.g., Have a look at 
Kafka09ProducerSecuredITCase which is extended from Kafka09ProducerITCase which 
has 2 level of parent classes (KafkaProducerTestBase & KafkaTestBase). If we 
write a separate test case for secure cluster, then I may have to duplicate 
some of these base class code which can be avoided if we use custom JRunner. We 
could still limit the number of test cases to just one or two to minimize the 
running time. Please let me know your thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4308) Allow uploaded jar directory to be configurable

2016-08-02 Thread Zhenzhong Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404889#comment-15404889
 ] 

Zhenzhong Xu edited comment on FLINK-4308 at 8/2/16 9:57 PM:
-

The proposal is to introduce a new optional config "jobmanager.web.uploaddir". 
If specified job jar will be uploaded and read from the predefined directory. 
If not specified, it falls back to the old behavior which is to create a 
dynamic directory under whatever specified "jobmanager.web.tmpdir" (or default 
tmp if this is not specified either).

Change is simple, let me know if this sounds good I can send a PR in.


was (Author: zhenzhongxu):
The proposal is to introduce a new optional config "jobmanager.web.uploaddir". 
If specified job jar will be uploaded and read from the predefined directory. 
If not specified, it falls back to the old behavior which is to create a 
dynamic directory under whatever specified "jobmanager.web.tmpdir" (or default 
tmp if this is not specified either).

Change is simple, let me know if this sounds good I an send a PR in.

> Allow uploaded jar directory to be configurable 
> 
>
> Key: FLINK-4308
> URL: https://issues.apache.org/jira/browse/FLINK-4308
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Zhenzhong Xu
>Priority: Minor
>
> I notice sometimes it's preferable to have uploaded jars to be put into a 
> configurable directory location instead only have it at runtime. In this case 
> we can pre-load the directory with jars in a docker image and allows us to 
> leverage the jobmanager restful interface to start/kill jobs.
> WebRuntimeMonitor.java
> String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
> this.uploadDir = new File(getBaseDir(config), uploadDirName);
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4308) Allow uploaded jar directory to be configurable

2016-08-02 Thread Zhenzhong Xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404889#comment-15404889
 ] 

Zhenzhong Xu commented on FLINK-4308:
-

The proposal is to introduce a new optional config "jobmanager.web.uploaddir". 
If specified job jar will be uploaded and read from the predefined directory. 
If not specified, it falls back to the old behavior which is to create a 
dynamic directory under whatever specified "jobmanager.web.tmpdir" (or default 
tmp if this is not specified either).

Change is simple, let me know if this sounds good I an send a PR in.

> Allow uploaded jar directory to be configurable 
> 
>
> Key: FLINK-4308
> URL: https://issues.apache.org/jira/browse/FLINK-4308
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Zhenzhong Xu
>Priority: Minor
>
> I notice sometimes it's preferable to have uploaded jars to be put into a 
> configurable directory location instead only have it at runtime. In this case 
> we can pre-load the directory with jars in a docker image and allows us to 
> leverage the jobmanager restful interface to start/kill jobs.
> WebRuntimeMonitor.java
> String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
> this.uploadDir = new File(getBaseDir(config), uploadDirName);
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404880#comment-15404880
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2275#discussion_r73245570
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -1016,6 +1016,23 @@
/** The environment variable name which contains the location of the 
lib folder */
public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
+   //  Security 
---
+
+   /**
+* The config parameter defining security credentials required
+* for securing Flink cluster.
+*/
+
+   /** Keytab file key name to be used in flink configuration file */
+   public static final String SECURITY_KEYTAB_KEY = "security.keytab";
+
+   /** Kerberos security principal key name to be used in flink 
configuration file */
+   public static final String SECURITY_PRINCIPAL_KEY = 
"security.principal";
+
+   /** Keytab file name populated in YARN container */
+   public static final String KEYTAB_FILE_NAME = "krb5.keytab";
--- End diff --

Okay, will move it to flink-yarn module


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2275: FLINK-3929 Support for Kerberos Authentication wit...

2016-08-02 Thread vijikarthi
Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2275#discussion_r73245570
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -1016,6 +1016,23 @@
/** The environment variable name which contains the location of the 
lib folder */
public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
+   //  Security 
---
+
+   /**
+* The config parameter defining security credentials required
+* for securing Flink cluster.
+*/
+
+   /** Keytab file key name to be used in flink configuration file */
+   public static final String SECURITY_KEYTAB_KEY = "security.keytab";
+
+   /** Kerberos security principal key name to be used in flink 
configuration file */
+   public static final String SECURITY_PRINCIPAL_KEY = 
"security.principal";
+
+   /** Keytab file name populated in YARN container */
+   public static final String KEYTAB_FILE_NAME = "krb5.keytab";
--- End diff --

Okay, will move it to flink-yarn module


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4262) Consider null handling during sorting

2016-08-02 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404861#comment-15404861
 ] 

Ivan Mushketyk commented on FLINK-4262:
---

I would like to implement this.

> Consider null handling during sorting
> -
>
> Key: FLINK-4262
> URL: https://issues.apache.org/jira/browse/FLINK-4262
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Priority: Minor
>
> Calcite's SQL parser allows to specify how to handle NULLs during sorting.
> {code}
> orderItem:
>   expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ]
> {code}
> Currently, the NULL FIRST/NULLS LAST is completely ignored but might be 
> helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4262) Consider null handling during sorting

2016-08-02 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-4262:
-

Assignee: Ivan Mushketyk

> Consider null handling during sorting
> -
>
> Key: FLINK-4262
> URL: https://issues.apache.org/jira/browse/FLINK-4262
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Calcite's SQL parser allows to specify how to handle NULLs during sorting.
> {code}
> orderItem:
>   expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ]
> {code}
> Currently, the NULL FIRST/NULLS LAST is completely ignored but might be 
> helpful.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3414) Add Scala API for CEP's pattern definition

2016-08-02 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404852#comment-15404852
 ] 

Ivan Mushketyk commented on FLINK-3414:
---


I would like to describe my proposal on the case-by-case basis. Feel free to 
point on any gaps and inconsistencies in this design.
Also, I am not sure what is the best medium for this kind of discussion. I will 
start it here, but if you find it inconvenient or sub-optimal, I can move it to 
some other place.

Let's start with a simplest pattern that expects a single event:

{code:java}
Pattern.begin("start").where(evt -> evt.getId() == 42)
{code}

This can be replaced with a much shorter Scala version

{code:java}
pattern "start" _.getId() == 42
{code}

A single function call *pattern* can replace *Pattern.begin* call and *where* 
condition can be added as an argument to this function call.

A pattern that is expecting a specific type like this one:

{code:java}
Pattern.begin("start").where(evt -> evt.getId() == 42)
   .subtype(SubEvent.class)
{code}

Can be replaced with the following Scala code:

{code:java}
pattern[SubEvent] "start" _.id == 42
{code}

The type of an object that is expected can be added as a type parameter to the 
"pattern" function.

The *next* function can be replaced with the *->* operator:

{code:java}
Pattern.begin("start").where(evt -> evt.getId() == 42)
   .subtype(SubEvent.class)
   .next("next").where(evt -> evt.getId() > 42)
   .subtype(SubEvent.class)
{code}

as in this code snippet:

{code:java}
pattern[SubEvent] "start" _.id == 42 ->[SubEvent] _.getId() > 42
{code}

or with a function call:

{code:java}
pattern[SubEvent] "start" _.id == 42 next[SubEvent] _.getId() > 42
{code}

A followedBy can be replaced with a *->>* operator:

{code:java}
Pattern.begin("start").where(evt -> evt.getId() == 42)
.next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() 
>= 10.0)
.followedBy("end").where(evt -> evt.getName().equals("end"));
{code}

as in the following example:

{code:java}
pattern "start" _.getId() == 42
 ->[SubEvent] "middle" _.getVolume() >= 10.0
 ->> "end" _.getName() == "end"
{code}

A *within* function call can be replaced with *in* function that expects an 
instance of a FinateDuration class that can provide a more readable code:

{code:java}
pattern "start" -> "middle" _.getName() == "error" ->> "end" _.getName() == 
"critical" in 10 seconds
{code}

h3. Additional operators

As far as I understand *or* and *not* operators are not yet implemented, but 
here are few examples for these operators

{code:java}
// Pattern that has an event that is not SubEvent
pattern "start" ! is[SubEvent]
{code}

This example also shows an alternative way of specifying a type of element with 
the *is* function.

{code:java}
// Pattern that has type SubEvent or has id 42
pattern "start" is[SubEvent] || is _.getId() == 42
{code}

> Add Scala API for CEP's pattern definition
> --
>
> Key: FLINK-3414
> URL: https://issues.apache.org/jira/browse/FLINK-3414
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the CEP library only supports a Java API to specify complex event 
> patterns. In order to make it a bit less verbose for Scala users, it would be 
> nice to also add a Scala API for the CEP library. 
> A Scala API would also allow to pass Scala's anonymous functions as filter 
> conditions or as a select function, for example, or to use partial functions 
> to distinguish between different events.
> Furthermore, the Scala API could be designed to feel a bit more like a DSL:
> {code}
> begin "start" where _.id >= 42 -> "middle_1" as classOf[Subclass] || 
> "middle_2" where _.name equals "foobar" -> "end" where x => x.id <= x.volume
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-02 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404730#comment-15404730
 ] 

Ivan Mushketyk commented on FLINK-3298:
---

Hi,

Thank you for reviewing my changes.

I don't think I can elaborate a lot about if this particular feature will be 
used by many users. I saw that there are few issues in JIRA related to ActiveMQ 
support, so I assumed this feature is in demand.

As a non-test dependency I only added activemq-client as a dependency. Here is 
a list of packages it depends on with their latencies: 
http://activemq.apache.org/maven/5.8.0/activemq-client/dependencies.html

> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4307) Broken user-facing API for ListState

2016-08-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4307.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in
  - 1.1.0 via c71a0c73448f82e20cdc87bcc7e94220a472e586
  - 1.2.0 via d5a06b4d634a54079a2a484a0c619e02cbaf2912

> Broken user-facing API for ListState
> 
>
> Key: FLINK-4307
> URL: https://issues.apache.org/jira/browse/FLINK-4307
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.1.0, 1.2.0
>
>
> The user-facing {{ListState}} is supposed to return an empty list when no 
> element is contained in the state.
> A previous change altered that behavior to make it in the runtime classes 
> accessible whether a ListState is empty.
> To not break the user-facing API, we need to restore the behavior for 
> ListState exposed to the users via the {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4307) Broken user-facing API for ListState

2016-08-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4307.
---

> Broken user-facing API for ListState
> 
>
> Key: FLINK-4307
> URL: https://issues.apache.org/jira/browse/FLINK-4307
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.1.0, 1.2.0
>
>
> The user-facing {{ListState}} is supposed to return an empty list when no 
> element is contained in the state.
> A previous change altered that behavior to make it in the runtime classes 
> accessible whether a ListState is empty.
> To not break the user-facing API, we need to restore the behavior for 
> ListState exposed to the users via the {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404582#comment-15404582
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2109
  
@mxm Thank you for your detailed review! I'll update it according to your 
feedback and revert the breaking changes.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

2016-08-02 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2109
  
@mxm Thank you for your detailed review! I'll update it according to your 
feedback and revert the breaking changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404579#comment-15404579
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73213488
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -257,41 +257,22 @@ public void testFileInputSplit() {
public void testIgnoredUnderscoreFiles() {
try {
final String contents = "CONTENTS";
-   
+
// create some accepted, some ignored files
-   
-   File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-   File f;
-   do {
-   f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-   }
-   while (f.exists());
 
-   assertTrue(f.mkdirs());
-   f.deleteOnExit();
-   
-   File child1 = new File(f, "dataFile1.txt");
-   File child2 = new File(f, "another_file.bin");
-   File luigiFile = new File(f, "_luigi");
-   File success = new File(f, "_SUCCESS");
-   
-   File[] files = { child1, child2, luigiFile, success };
-   
-   for (File child : files) {
-   child.deleteOnExit();
-   
-   BufferedWriter out = new BufferedWriter(new 
FileWriter(child));
-   try { 
-   out.write(contents);
-   } finally {
-   out.close();
-   }
-   }
+   File tempDirectory = createTempDirectory();

+   File child1 = new File(tempDirectory, "dataFile1.txt");
+   File child2 = new File(tempDirectory, 
"another_file.bin");
+   File luigiFile = new File(tempDirectory, "_luigi");
+   File success = new File(tempDirectory, "_SUCCESS");
+
+   createTempFiles(contents.getBytes(), child1, child2, 
luigiFile, success);
--- End diff --

Sure, I'll update this.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404577#comment-15404577
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73213456
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
--- End diff --

"glob" is the simplified regex syntax that is used in Linux shell, 
.gitignore files and many other cases. 
https://en.wikipedia.org/wiki/Glob_(programming)

I thought that it should be used instead of regexes because it should cover 
most of the practical use cases and is much simpler to use.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73213456
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
--- End diff --

"glob" is the simplified regex syntax that is used in Linux shell, 
.gitignore files and many other cases. 
https://en.wikipedia.org/wiki/Glob_(programming)

I thought that it should be used instead of regexes because it should cover 
most of the practical use cases and is much simpler to use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73213488
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -257,41 +257,22 @@ public void testFileInputSplit() {
public void testIgnoredUnderscoreFiles() {
try {
final String contents = "CONTENTS";
-   
+
// create some accepted, some ignored files
-   
-   File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-   File f;
-   do {
-   f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-   }
-   while (f.exists());
 
-   assertTrue(f.mkdirs());
-   f.deleteOnExit();
-   
-   File child1 = new File(f, "dataFile1.txt");
-   File child2 = new File(f, "another_file.bin");
-   File luigiFile = new File(f, "_luigi");
-   File success = new File(f, "_SUCCESS");
-   
-   File[] files = { child1, child2, luigiFile, success };
-   
-   for (File child : files) {
-   child.deleteOnExit();
-   
-   BufferedWriter out = new BufferedWriter(new 
FileWriter(child));
-   try { 
-   out.write(contents);
-   } finally {
-   out.close();
-   }
-   }
+   File tempDirectory = createTempDirectory();

+   File child1 = new File(tempDirectory, "dataFile1.txt");
+   File child2 = new File(tempDirectory, 
"another_file.bin");
+   File luigiFile = new File(tempDirectory, "_luigi");
+   File success = new File(tempDirectory, "_SUCCESS");
+
+   createTempFiles(contents.getBytes(), child1, child2, 
luigiFile, success);
--- End diff --

Sure, I'll update this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73212835
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -995,8 +993,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() 
{
public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

FileProcessingMode watchType,
-   
long interval,
-   
FilePathFilter filter) {
+   
long interval) {
--- End diff --

Good catch. I'll revert this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404571#comment-15404571
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73212835
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -995,8 +993,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() 
{
public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

FileProcessingMode watchType,
-   
long interval,
-   
FilePathFilter filter) {
+   
long interval) {
--- End diff --

Good catch. I'll revert this.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3779) Add support for queryable state

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404491#comment-15404491
 ] 

ASF GitHub Bot commented on FLINK-3779:
---

Github user rehevkor5 commented on the issue:

https://github.com/apache/flink/pull/2051
  
Hi, it's great to see that someone is working on this stuff!

I just wanted to put in my two cents, to provide a different perspective 
that might change how you are thinking about this.

On my project, we are interested in incorporating pre-computed historical 
time-series data into the values within a time window. Those values would need 
to be loaded from a distributed database such as Cassandra or DynamoDB. Also, 
we would like for newly computed time-series data points (produced by a Flink 
window pane) to be persisted externally, side-by-side with the historical data 
(in Cassandra/DynamoDB).

In contrast with your approach, which enables querying of state from within 
Flink, we are more interested in querying that state from the external 
database. This allows the Flink job to produce time series data which can be 
queried ad-hoc in the database, while also allowing the Flink job to produce 
pre-calculated aggregates from that time series.

I believe others have mentioned in this thread the need, therefore, to 
allow the State Store to choose the serialization approach. While serializing 
to byte[] works well for Memory and RocksDB State Stores, inserting into a 
NoSQL database requires creation of an INSERT command with data that includes 
primary/partition key, secondary/range key, and arbitrarily structured data 
(one column of byte[], or perhaps more complex based on the particular type of 
value). In particular, we need the timestamp of the time series point to be a 
top-level value in the INSERT, so that time range queries can be efficient. The 
interface is also important when it comes to Flink loading pre-existing data, 
because Flink or an integration layer will need to know how to query for the 
particular keys it is looking for.

I hope that makes sense & gives some perspective on what some people are 
thinking about with regard to "queryable state".


> Add support for queryable state
> ---
>
> Key: FLINK-3779
> URL: https://issues.apache.org/jira/browse/FLINK-3779
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Flink offers state abstractions for user functions in order to guarantee 
> fault-tolerant processing of streams. Users can work with both 
> non-partitioned (Checkpointed interface) and partitioned state 
> (getRuntimeContext().getState(ValueStateDescriptor) and other variants).
> The partitioned state interface provides access to different types of state 
> that are all scoped to the key of the current input element. This type of 
> state can only be used on a KeyedStream, which is created via stream.keyBy().
> Currently, all of this state is internal to Flink and used in order to 
> provide processing guarantees in failure cases (e.g. exactly-once processing).
> The goal of Queryable State is to expose this state outside of Flink by 
> supporting queries against the partitioned key value state.
> This will help to eliminate the need for distributed operations/transactions 
> with external systems such as key-value stores which are often the bottleneck 
> in practice. Exposing the local state to the outside moves a good part of the 
> database work into the stream processor, allowing both high throughput 
> queries and immediate access to the computed state.
> This is the initial design doc for the feature: 
> https://docs.google.com/document/d/1NkQuhIKYmcprIU5Vjp04db1HgmYSsZtCMxgDi_iTN-g.
>  Feel free to comment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state

2016-08-02 Thread rehevkor5
Github user rehevkor5 commented on the issue:

https://github.com/apache/flink/pull/2051
  
Hi, it's great to see that someone is working on this stuff!

I just wanted to put in my two cents, to provide a different perspective 
that might change how you are thinking about this.

On my project, we are interested in incorporating pre-computed historical 
time-series data into the values within a time window. Those values would need 
to be loaded from a distributed database such as Cassandra or DynamoDB. Also, 
we would like for newly computed time-series data points (produced by a Flink 
window pane) to be persisted externally, side-by-side with the historical data 
(in Cassandra/DynamoDB).

In contrast with your approach, which enables querying of state from within 
Flink, we are more interested in querying that state from the external 
database. This allows the Flink job to produce time series data which can be 
queried ad-hoc in the database, while also allowing the Flink job to produce 
pre-calculated aggregates from that time series.

I believe others have mentioned in this thread the need, therefore, to 
allow the State Store to choose the serialization approach. While serializing 
to byte[] works well for Memory and RocksDB State Stores, inserting into a 
NoSQL database requires creation of an INSERT command with data that includes 
primary/partition key, secondary/range key, and arbitrarily structured data 
(one column of byte[], or perhaps more complex based on the particular type of 
value). In particular, we need the timestamp of the time series point to be a 
top-level value in the INSERT, so that time range queries can be efficient. The 
interface is also important when it comes to Flink loading pre-existing data, 
because Flink or an integration layer will need to know how to query for the 
particular keys it is looking for.

I hope that makes sense & gives some perspective on what some people are 
thinking about with regard to "queryable state".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4308) Allow uploaded jar directory to be configurable

2016-08-02 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4308:

Description: 
I notice sometimes it's preferable to have uploaded jars to be put into a 
configurable directory location instead only have it at runtime. In this case 
we can pre-load the directory with jars in a docker image and allows us to 
leverage the jobmanager restful interface to start/kill jobs.

WebRuntimeMonitor.java

String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
this.uploadDir = new File(getBaseDir(config), uploadDirName);


  was:
I notice sometimes it's preferable to have uploaded jars to be put into a 
configurable directory location instead only have it at runtime. In this case 
we can pre-load the directory with jars and allows us to leverage the 
jobmanager restful interface to start/kill jobs.

WebRuntimeMonitor.java

String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
this.uploadDir = new File(getBaseDir(config), uploadDirName);



> Allow uploaded jar directory to be configurable 
> 
>
> Key: FLINK-4308
> URL: https://issues.apache.org/jira/browse/FLINK-4308
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Zhenzhong Xu
>Priority: Minor
>
> I notice sometimes it's preferable to have uploaded jars to be put into a 
> configurable directory location instead only have it at runtime. In this case 
> we can pre-load the directory with jars in a docker image and allows us to 
> leverage the jobmanager restful interface to start/kill jobs.
> WebRuntimeMonitor.java
> String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
> this.uploadDir = new File(getBaseDir(config), uploadDirName);
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4308) Allow uploaded jar directory to be configurable

2016-08-02 Thread Zhenzhong Xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenzhong Xu updated FLINK-4308:

Description: 
I notice sometimes it's preferable to have uploaded jars to be put into a 
configurable directory location instead only have it at runtime. In this case 
we can pre-load the directory with jars and allows us to leverage the 
jobmanager restful interface to start/kill jobs.

WebRuntimeMonitor.java

String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
this.uploadDir = new File(getBaseDir(config), uploadDirName);


  was:
I notice sometimes it's preferable to have uploaded jars to be put into a 
configurable directory location instead only have it at runtime.

WebRuntimeMonitor.java

String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
this.uploadDir = new File(getBaseDir(config), uploadDirName);



> Allow uploaded jar directory to be configurable 
> 
>
> Key: FLINK-4308
> URL: https://issues.apache.org/jira/browse/FLINK-4308
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Zhenzhong Xu
>Priority: Minor
>
> I notice sometimes it's preferable to have uploaded jars to be put into a 
> configurable directory location instead only have it at runtime. In this case 
> we can pre-load the directory with jars and allows us to leverage the 
> jobmanager restful interface to start/kill jobs.
> WebRuntimeMonitor.java
> String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
> this.uploadDir = new File(getBaseDir(config), uploadDirName);
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4308) Allow uploaded jar directory to be configurable

2016-08-02 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4308:
---

 Summary: Allow uploaded jar directory to be configurable 
 Key: FLINK-4308
 URL: https://issues.apache.org/jira/browse/FLINK-4308
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: Zhenzhong Xu
Priority: Minor


I notice sometimes it's preferable to have uploaded jars to be put into a 
configurable directory location instead only have it at runtime.

WebRuntimeMonitor.java

String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
this.uploadDir = new File(getBaseDir(config), uploadDirName);




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2324: Bump version to 1.2-SNAPSHOT

2016-08-02 Thread mbalassi
Github user mbalassi commented on the issue:

https://github.com/apache/flink/pull/2324
  
Good catch, looking into that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work

2016-08-02 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404384#comment-15404384
 ] 

ramkrishna.s.vasudevan commented on FLINK-4094:
---

I can do this Log update and document update tomorrow and open a PR for the 
same.

> Off heap memory deallocation might not properly work
> 
>
> Key: FLINK-4094
> URL: https://issues.apache.org/jira/browse/FLINK-4094
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.1.0
>
>
> A user reported that off-heap memory is not properly deallocated when setting 
> {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause 
> the TaskManager process being killed by the OS.
> It should be possible to execute multiple batch jobs with preallocation 
> turned off. No longer used direct memory buffers should be properly garbage 
> collected so that the JVM process does not exceed it's maximum memory bounds.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work

2016-08-02 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404378#comment-15404378
 ] 

ramkrishna.s.vasudevan commented on FLINK-4094:
---

bq. We cannot really manually release the memory when freeing the segment, 
because the ByteBuffer wrapper object may still exist. 
Ideally when we are going to pool we won't try to free the memory - so the 
ByteBuffer wrapper will exist and that is what we will pool. I think once we do 
this we wont do segment.free() on that buffer and we will allow the address to 
be valid - if am not wrong.
Just a question, In case of { preallocation = true }, what does happen if the 
number of requests is more than the initial size? So we consume all the buffers 
in the pool but new requets won't be served?
bq.What we can do now, is to discourage the use of off-heap memory with 
preallocation set to false. For example, print a prominent warning and add a 
hint to the documentation.
May be for now we can do it. 
bq. I think before we change memory allocation behavior, we should discuss that 
on the Flink mailing list.
Ok sounds like a plan. So once we discuss I think we can go with the lazy 
allocation pooling model and that should be beneficial. Because anyway current 
pooling is with a unbounded queue and similarly it can be done here too. 
One thing to note is that even with pooling if the MaxDirectMemory is still not 
configured right we will not be able to work with offheap buffers. The only 
thing is we won't grow infinitely. 

> Off heap memory deallocation might not properly work
> 
>
> Key: FLINK-4094
> URL: https://issues.apache.org/jira/browse/FLINK-4094
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.1.0
>
>
> A user reported that off-heap memory is not properly deallocated when setting 
> {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause 
> the TaskManager process being killed by the OS.
> It should be possible to execute multiple batch jobs with preallocation 
> turned off. No longer used direct memory buffers should be properly garbage 
> collected so that the JVM process does not exceed it's maximum memory bounds.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4224) Exception after successful execution os job submitted through the web interface.

2016-08-02 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404370#comment-15404370
 ] 

Kostas Kloudas commented on FLINK-4224:
---

No. As I also said to Max, this does not appear any more.




> Exception after successful execution os job submitted through the web 
> interface.
> 
>
> Key: FLINK-4224
> URL: https://issues.apache.org/jira/browse/FLINK-4224
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Kostas Kloudas
>Assignee: Aljoscha Krettek
>
> When submitting the job below through the web interface to a local flink 
> cluster:
> {code}
>   public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env =
>   SreamExecutionEnvironment.getExecutionEnvironment();
>   TextInputFormat inputFormat = 
>  new TextInputFormat(new 
> Path("file:///YOUR_TEXT_FILE"));
>   DataStream> input = env.
>   
> readFile(inputFormat,inputFormat.getFilePath().toString()).
>   flatMap(new Tokenizer()).
>   keyBy(0).
>   sum(1);
>   input.print();
>   env.execute("WebClient Testing Example.");
>   }
>   
> public static final class Tokenizer implements 
> FlatMapFunction> {
>   private static final long serialVersionUID = 1L;
>   @Override
>   public void flatMap(String value, Collector Integer>> out)
>   throws Exception {
>   // normalize and split the line
>   String[] tokens = value.toLowerCase().split("\\W+");
>   // emit the pairs
>   for (String token : tokens) {
>   if (token.length() > 0) {
>   out.collect(new Tuple2<>(token, 1));
>   }
>   }
>   }
>   }
> {code}
> The job succeeds, but at the logs there is a NPE: 
> {code} 2016-07-16 16:56:18,197 WARN  
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - 
>  Error while handling request java.lang.RuntimeException: 
>  Couldn't deserialize ExecutionConfig.
> ...
> Caused by: java.lang.NullPointerException at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:55)
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler.handleRequest(JobConfigHandler.java:50)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4306) Fix Flink and Storm dependencies in flink-storm and flink-storm-examples

2016-08-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4306.
---

> Fix Flink and Storm dependencies in flink-storm and flink-storm-examples
> 
>
> Key: FLINK-4306
> URL: https://issues.apache.org/jira/browse/FLINK-4306
> Project: Flink
>  Issue Type: Improvement
>  Components: Storm Compatibility
>Affects Versions: 1.1.0, 1.0.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
>   - Flink dependencies should be in scope {{provided}}, like in the other 
> libraries.
>   - {{flink-storm-examples}} should not draw {{storm-core}} directly, but 
> only via {{flink-storm}}, so it gets the proper transitive dependency 
> exclusions
>   - {{flink-storm-examples}} should have the clojure jar repository as an 
> additional maven repository



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4306) Fix Flink and Storm dependencies in flink-storm and flink-storm-examples

2016-08-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4306.
-
   Resolution: Fixed
Fix Version/s: (was: 1.1.1)

Fixed via 8775189c2df6e30971e365e304fc25d1613c9cb8

https://github.com/apache/flink/commit/8775189c2df6e30971e365e304fc25d1613c9cb8

> Fix Flink and Storm dependencies in flink-storm and flink-storm-examples
> 
>
> Key: FLINK-4306
> URL: https://issues.apache.org/jira/browse/FLINK-4306
> Project: Flink
>  Issue Type: Improvement
>  Components: Storm Compatibility
>Affects Versions: 1.1.0, 1.0.3
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
>   - Flink dependencies should be in scope {{provided}}, like in the other 
> libraries.
>   - {{flink-storm-examples}} should not draw {{storm-core}} directly, but 
> only via {{flink-storm}}, so it gets the proper transitive dependency 
> exclusions
>   - {{flink-storm-examples}} should have the clojure jar repository as an 
> additional maven repository



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4307) Broken user-facing API for ListState

2016-08-02 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4307:
---

 Summary: Broken user-facing API for ListState
 Key: FLINK-4307
 URL: https://issues.apache.org/jira/browse/FLINK-4307
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 1.1.0


The user-facing {{ListState}} is supposed to return an empty list when no 
element is contained in the state.

A previous change altered that behavior to make it in the runtime classes 
accessible whether a ListState is empty.

To not break the user-facing API, we need to restore the behavior for ListState 
exposed to the users via the {{RuntimeContext}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work

2016-08-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404270#comment-15404270
 ] 

Stephan Ewen commented on FLINK-4094:
-

True, it is not documented the {{preallocate == false}} returns the memory, but 
is is quite a desirable behavior, actually.

> Off heap memory deallocation might not properly work
> 
>
> Key: FLINK-4094
> URL: https://issues.apache.org/jira/browse/FLINK-4094
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.1.0
>
>
> A user reported that off-heap memory is not properly deallocated when setting 
> {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause 
> the TaskManager process being killed by the OS.
> It should be possible to execute multiple batch jobs with preallocation 
> turned off. No longer used direct memory buffers should be properly garbage 
> collected so that the JVM process does not exceed it's maximum memory bounds.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4304) Jar names that contain whitespace cause problems in web client

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404230#comment-15404230
 ] 

ASF GitHub Bot commented on FLINK-4304:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2327
  
+1 LGTM


> Jar names that contain whitespace cause problems in web client
> --
>
> Key: FLINK-4304
> URL: https://issues.apache.org/jira/browse/FLINK-4304
> Project: Flink
>  Issue Type: Bug
>  Components: Web Client
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> If the Jar file name contains whitespaces the web client can not start or 
> delete a job:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: JAR file does not 
> exist 
> '/var/folders/w8/k702f8s1017dfbfx_qlv2p24gn/T/flink-web-upload-4c52b922-8307-4098-b196-58b971864c51/980cad63-304c-48bb-a403-4756aea26ab4_Word%20Count.jar'
>   at 
> org.apache.flink.client.program.PackagedProgram.checkJarFile(PackagedProgram.java:755)
>   at 
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:181)
>   at 
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:147)
>   at 
> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:91)
>   at 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2327: [FLINK-4304] [runtime-web] Jar names that contain whitesp...

2016-08-02 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2327
  
+1 LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404228#comment-15404228
 ] 

ASF GitHub Bot commented on FLINK-4271:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2305
  
I think it breaks binary compatibility, not source compatibility.
The specific return type is part of the method signature used by the JVM 
for linking.


> There is no way to set parallelism of operators produced by CoGroupedStreams
> 
>
> Key: FLINK-4271
> URL: https://issues.apache.org/jira/browse/FLINK-4271
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Wenlong Lyu
>Assignee: Jark Wu
>
> Currently, CoGroupStreams package the map/keyBy/window operators with a human 
> friendly interface, like: 
> dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the 
> intermediate operators and final window operators can not be accessed by 
> users, and we cannot set attributes of the operators, which make co-group 
> hard to use in production environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2305: [FLINK-4271] [DataStreamAPI] Enable CoGroupedStreams and ...

2016-08-02 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2305
  
I think it breaks binary compatibility, not source compatibility.
The specific return type is part of the method signature used by the JVM 
for linking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2282: [FLINK-3940] [table] Add support for ORDER BY OFFS...

2016-08-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73181770
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
+val partitionCount = partitionedDs.mapPartition(
+  new MapPartitionFunction[Any, Int] {
+override def mapPartition(value: lang.Iterable[Any], out: 
Collector[Int]): Unit = {
+  val iterator = value.iterator()
+  var elementCount = 0
+  while (iterator.hasNext) {
+elementCount += 1
+iterator -> iterator.next()
+  }
+  out.collect(elementCount)
+}
+  }).collect().asScala
--- End diff --

This triggers multiple jobs. I don't know if we want that. Isn't there a 
better way to do it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404225#comment-15404225
 ] 

ASF GitHub Bot commented on FLINK-3940:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73181770
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
+val partitionCount = partitionedDs.mapPartition(
+  new MapPartitionFunction[Any, Int] {
+override def mapPartition(value: lang.Iterable[Any], out: 
Collector[Int]): Unit = {
+  val iterator = value.iterator()
+  var elementCount = 0
+  while (iterator.hasNext) {
+elementCount += 1
+iterator -> iterator.next()
+  }
+  out.collect(elementCount)
+}
+  }).collect().asScala
--- End diff --

This triggers multiple jobs. I don't know if we want that. Isn't there a 
better way to do it?


> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404216#comment-15404216
 ] 

ASF GitHub Bot commented on FLINK-3940:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73180693
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
+val partitionCount = partitionedDs.mapPartition(
+  new MapPartitionFunction[Any, Int] {
+override def mapPartition(value: lang.Iterable[Any], out: 
Collector[Int]): Unit = {
+  val iterator = value.iterator()
+  var elementCount = 0
+  while (iterator.hasNext) {
+elementCount += 1
+iterator -> iterator.next()
--- End diff --

I think `->` is unintended right?


> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2282: [FLINK-3940] [table] Add support for ORDER BY OFFS...

2016-08-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73180693
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
+val partitionCount = partitionedDs.mapPartition(
+  new MapPartitionFunction[Any, Int] {
+override def mapPartition(value: lang.Iterable[Any], out: 
Collector[Int]): Unit = {
+  val iterator = value.iterator()
+  var elementCount = 0
+  while (iterator.hasNext) {
+elementCount += 1
+iterator -> iterator.next()
--- End diff --

I think `->` is unintended right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404196#comment-15404196
 ] 

ASF GitHub Bot commented on FLINK-3940:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73179240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
+val partitionCount = partitionedDs.mapPartition(
+  new MapPartitionFunction[Any, Int] {
--- End diff --

Can you move all runtime functions into package `table.runtime`?


> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2282: [FLINK-3940] [table] Add support for ORDER BY OFFS...

2016-08-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73179240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
+val partitionCount = partitionedDs.mapPartition(
+  new MapPartitionFunction[Any, Int] {
--- End diff --

Can you move all runtime functions into package `table.runtime`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404193#comment-15404193
 ] 

ASF GitHub Bot commented on FLINK-4271:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2305
  
@StephanEwen I think technically it doesn't break the API because 
`SingleOutputStreamOperator` is a subclass of `DataStream`, right? (It might 
break binary compatibility though, because the signature of the method changes) 
It might only be the checker that is to sensitive.


> There is no way to set parallelism of operators produced by CoGroupedStreams
> 
>
> Key: FLINK-4271
> URL: https://issues.apache.org/jira/browse/FLINK-4271
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Wenlong Lyu
>Assignee: Jark Wu
>
> Currently, CoGroupStreams package the map/keyBy/window operators with a human 
> friendly interface, like: 
> dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the 
> intermediate operators and final window operators can not be accessed by 
> users, and we cannot set attributes of the operators, which make co-group 
> hard to use in production environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2305: [FLINK-4271] [DataStreamAPI] Enable CoGroupedStreams and ...

2016-08-02 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2305
  
@StephanEwen I think technically it doesn't break the API because 
`SingleOutputStreamOperator` is a subclass of `DataStream`, right? (It might 
break binary compatibility though, because the signature of the method changes) 
It might only be the checker that is to sensitive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404188#comment-15404188
 ] 

ASF GitHub Bot commented on FLINK-3940:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73178324
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
--- End diff --

Can you swap the branches? In the parallelism check above we first checked 
for `==1` otherwise it might be confusing.


> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4275) Generic Folding, Reducing and List states behave differently from other state backends

2016-08-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404186#comment-15404186
 ] 

Aljoscha Krettek commented on FLINK-4275:
-

This was changed to make it possible in the {{WindowOperator}} to check whether 
a state is empty.

> Generic Folding, Reducing and List states behave differently from other state 
> backends
> --
>
> Key: FLINK-4275
> URL: https://issues.apache.org/jira/browse/FLINK-4275
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming
>Reporter: Gyula Fora
>Priority: Critical
>
> In 
> https://github.com/apache/flink/commit/12bf7c1a0b81d199085fe874c64763c51a93b3bf
>  the expected behaviour of Folding/Reducing/List states have been changed to 
> return null instead of empty collections/default values.
> This was adapted for the included state backends (Memory, FS, Rocks) but not 
> for the Generic state wrappers. As there are no tests for the Generic backend 
> using the StateBackendTestBase this didnt show.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2282: [FLINK-3940] [table] Add support for ORDER BY OFFS...

2016-08-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73178324
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
+  } else {
+Int.MaxValue
+  }
+  if (currentParallelism != 1) {
--- End diff --

Can you swap the branches? In the parallelism check above we first checked 
for `==1` otherwise it might be confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404181#comment-15404181
 ] 

ASF GitHub Bot commented on FLINK-3940:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73177719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
--- End diff --

Is it guaranteed that `fetch` can always be converted to an integer, esp. 
if it is coming from SQL API?


> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2282: [FLINK-3940] [table] Add support for ORDER BY OFFS...

2016-08-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73177719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 ---
@@ -71,11 +78,57 @@ class DataSetSort(
   partitionedDs = partitionedDs.sortPartition(fieldCollation._1, 
fieldCollation._2)
 }
 
+val offsetAndFetchDS = if (offset != null) {
+  val offsetIndex = RexLiteral.intValue(offset)
+  val fetchIndex = if (fetch != null) {
+RexLiteral.intValue(fetch) + offsetIndex
--- End diff --

Is it guaranteed that `fetch` can always be converted to an integer, esp. 
if it is coming from SQL API?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4081) FieldParsers should support empty strings

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404167#comment-15404167
 ] 

ASF GitHub Bot commented on FLINK-4081:
---

Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/2297
  
For our use cases it is important to know whether a value of cell is empty 
or it is 0.
The EMPTY_STRING error could be intercepted by the csv parser in order to 
generate Row objects with null values instead of 0 using the Table API


> FieldParsers should support empty strings
> -
>
> Key: FLINK-4081
> URL: https://issues.apache.org/jira/browse/FLINK-4081
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Flavio Pompermaier
>Assignee: Timo Walther
>  Labels: csvparser, table-api
>
> In order to parse CSV files using the new Table API that converts rows to Row 
> objects (that support null values), FiledParser implementations should 
> support emptry strings setting the parser state to 
> ParseErrorState.EMPTY_STRING (for example FloatParser and DoubleParser 
> doesn't respect this constraint)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2297: [FLINK-4081] [core] [table] FieldParsers should support e...

2016-08-02 Thread fpompermaier
Github user fpompermaier commented on the issue:

https://github.com/apache/flink/pull/2297
  
For our use cases it is important to know whether a value of cell is empty 
or it is 0.
The EMPTY_STRING error could be intercepted by the csv parser in order to 
generate Row objects with null values instead of 0 using the Table API


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2282: [FLINK-3940] [table] Add support for ORDER BY OFFS...

2016-08-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73172351
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -150,6 +150,47 @@ case class Sort(order: Seq[Ordering], child: 
LogicalNode) extends UnaryNode {
   }
 }
 
+case class Offset(offset: Int, child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+child.construct(relBuilder)
+relBuilder.limit(offset, -1)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  throw new TableException(s"Offset on stream tables is currently not 
supported.")
+}
+if (!child.validate(tableEnv).isInstanceOf[Sort]) {
--- End diff --

You don't need to call `validate` here. You can check the Expression itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4224) Exception after successful execution os job submitted through the web interface.

2016-08-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404157#comment-15404157
 ] 

Aljoscha Krettek commented on FLINK-4224:
-

[~kkl0u] is this still an issue on the new 1.1 RC?

> Exception after successful execution os job submitted through the web 
> interface.
> 
>
> Key: FLINK-4224
> URL: https://issues.apache.org/jira/browse/FLINK-4224
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Kostas Kloudas
>Assignee: Aljoscha Krettek
>
> When submitting the job below through the web interface to a local flink 
> cluster:
> {code}
>   public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env =
>   SreamExecutionEnvironment.getExecutionEnvironment();
>   TextInputFormat inputFormat = 
>  new TextInputFormat(new 
> Path("file:///YOUR_TEXT_FILE"));
>   DataStream> input = env.
>   
> readFile(inputFormat,inputFormat.getFilePath().toString()).
>   flatMap(new Tokenizer()).
>   keyBy(0).
>   sum(1);
>   input.print();
>   env.execute("WebClient Testing Example.");
>   }
>   
> public static final class Tokenizer implements 
> FlatMapFunction> {
>   private static final long serialVersionUID = 1L;
>   @Override
>   public void flatMap(String value, Collector Integer>> out)
>   throws Exception {
>   // normalize and split the line
>   String[] tokens = value.toLowerCase().split("\\W+");
>   // emit the pairs
>   for (String token : tokens) {
>   if (token.length() > 0) {
>   out.collect(new Tuple2<>(token, 1));
>   }
>   }
>   }
>   }
> {code}
> The job succeeds, but at the logs there is a NPE: 
> {code} 2016-07-16 16:56:18,197 WARN  
> org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - 
>  Error while handling request java.lang.RuntimeException: 
>  Couldn't deserialize ExecutionConfig.
> ...
> Caused by: java.lang.NullPointerException at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:55)
>  at 
> org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler.handleRequest(JobConfigHandler.java:50)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4246) Allow Specifying Multiple Metrics Reporters

2016-08-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek resolved FLINK-4246.
-
Resolution: Fixed

This was merged a while back

> Allow Specifying Multiple Metrics Reporters
> ---
>
> Key: FLINK-4246
> URL: https://issues.apache.org/jira/browse/FLINK-4246
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> We should allow specifying multiple reporters. A rough sketch of how the 
> configuration should look like is this:
> {code}
> metrics.reporters = foo,bar
> metrics.reporter.foo.class = JMXReporter.class
> metrics.reporter.foo.port = 42-117
> metrics.reporter.bar.class = GangliaReporter.class
> metrics.reporter.bar.port = 512
> metrics.reporter.bar.whatever = 2
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4271) There is no way to set parallelism of operators produced by CoGroupedStreams

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404149#comment-15404149
 ] 

ASF GitHub Bot commented on FLINK-4271:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2305
  
@wuchong The CI fails because this change breaks backwards compatibility. I 
understand though that this is a pretty critical change and may actually 
warrant to break the backwards compatibility.

Let me think what we can do there...


> There is no way to set parallelism of operators produced by CoGroupedStreams
> 
>
> Key: FLINK-4271
> URL: https://issues.apache.org/jira/browse/FLINK-4271
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Wenlong Lyu
>Assignee: Jark Wu
>
> Currently, CoGroupStreams package the map/keyBy/window operators with a human 
> friendly interface, like: 
> dataStreamA.cogroup(streamB).where(...).equalsTo().window().apply(), both the 
> intermediate operators and final window operators can not be accessed by 
> users, and we cannot set attributes of the operators, which make co-group 
> hard to use in production environment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2305: [FLINK-4271] [DataStreamAPI] Enable CoGroupedStreams and ...

2016-08-02 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2305
  
@wuchong The CI fails because this change breaks backwards compatibility. I 
understand though that this is a pretty critical change and may actually 
warrant to break the backwards compatibility.

Let me think what we can do there...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2282: [FLINK-3940] [table] Add support for ORDER BY OFFS...

2016-08-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73172521
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -150,6 +150,47 @@ case class Sort(order: Seq[Ordering], child: 
LogicalNode) extends UnaryNode {
   }
 }
 
+case class Offset(offset: Int, child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+child.construct(relBuilder)
+relBuilder.limit(offset, -1)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  throw new TableException(s"Offset on stream tables is currently not 
supported.")
+}
+if (!child.validate(tableEnv).isInstanceOf[Sort]) {
+  throw new TableException(s"Offset operator must follow behind 
orderBy clause.")
+}
+super.validate(tableEnv)
+  }
+}
+
+case class Fetch(fetch: Int, child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+
+val newChild = child.asInstanceOf[Offset].child
+newChild.construct(relBuilder)
+val relNode = child.toRelNode(relBuilder).asInstanceOf[LogicalSort]
+relBuilder.limit(RexLiteral.intValue(relNode.offset), fetch)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  throw new TableException(s"Fetch on stream tables is currently not 
supported.")
+}
+if (!child.validate(tableEnv).isInstanceOf[Offset]) {
--- End diff --

Some comment as above.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404137#comment-15404137
 ] 

ASF GitHub Bot commented on FLINK-3940:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73172351
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -150,6 +150,47 @@ case class Sort(order: Seq[Ordering], child: 
LogicalNode) extends UnaryNode {
   }
 }
 
+case class Offset(offset: Int, child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+child.construct(relBuilder)
+relBuilder.limit(offset, -1)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  throw new TableException(s"Offset on stream tables is currently not 
supported.")
+}
+if (!child.validate(tableEnv).isInstanceOf[Sort]) {
--- End diff --

You don't need to call `validate` here. You can check the Expression itself.


> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404141#comment-15404141
 ] 

ASF GitHub Bot commented on FLINK-3940:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73172521
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -150,6 +150,47 @@ case class Sort(order: Seq[Ordering], child: 
LogicalNode) extends UnaryNode {
   }
 }
 
+case class Offset(offset: Int, child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+child.construct(relBuilder)
+relBuilder.limit(offset, -1)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  throw new TableException(s"Offset on stream tables is currently not 
supported.")
+}
+if (!child.validate(tableEnv).isInstanceOf[Sort]) {
+  throw new TableException(s"Offset operator must follow behind 
orderBy clause.")
+}
+super.validate(tableEnv)
+  }
+}
+
+case class Fetch(fetch: Int, child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
+
+val newChild = child.asInstanceOf[Offset].child
+newChild.construct(relBuilder)
+val relNode = child.toRelNode(relBuilder).asInstanceOf[LogicalSort]
+relBuilder.limit(RexLiteral.intValue(relNode.offset), fetch)
+  }
+
+  override def validate(tableEnv: TableEnvironment): LogicalNode = {
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  throw new TableException(s"Fetch on stream tables is currently not 
supported.")
+}
+if (!child.validate(tableEnv).isInstanceOf[Offset]) {
--- End diff --

Some comment as above.



> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2282: [FLINK-3940] [table] Add support for ORDER BY OFFS...

2016-08-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73171110
  
--- Diff: docs/apis/table.md ---
@@ -606,6 +606,28 @@ Table result = in.orderBy("a.asc");
   
 
 
+
+  Offset
+  
+Similar to a SQL OFFSET clause. Returns rows from offset 
position. It is technically part of the ORDER BY clause.
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").offset(3);
+{% endhighlight %}
+  
+
+
+
+  Fetch
+  
+Similar to a SQL FETCH clause. Returns a set number of rows. 
FETCH can’t be used by itself, it is used in conjunction with OFFSET.
--- End diff --

If `fetch` can only be used after a `offset`. Wouldn't it be better to 
overload `offset`? We could also call it `limit` (with one and two parameters) 
similar to `RelBuilder` does it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3940) Add support for ORDER BY OFFSET FETCH

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404133#comment-15404133
 ] 

ASF GitHub Bot commented on FLINK-3940:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2282#discussion_r73171110
  
--- Diff: docs/apis/table.md ---
@@ -606,6 +606,28 @@ Table result = in.orderBy("a.asc");
   
 
 
+
+  Offset
+  
+Similar to a SQL OFFSET clause. Returns rows from offset 
position. It is technically part of the ORDER BY clause.
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").offset(3);
+{% endhighlight %}
+  
+
+
+
+  Fetch
+  
+Similar to a SQL FETCH clause. Returns a set number of rows. 
FETCH can’t be used by itself, it is used in conjunction with OFFSET.
--- End diff --

If `fetch` can only be used after a `offset`. Wouldn't it be better to 
overload `offset`? We could also call it `limit` (with one and two parameters) 
similar to `RelBuilder` does it.


> Add support for ORDER BY OFFSET FETCH
> -
>
> Key: FLINK-3940
> URL: https://issues.apache.org/jira/browse/FLINK-3940
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: GaoLun
>Priority: Minor
>
> Currently only ORDER BY without OFFSET and FETCH are supported.
> This issue tracks the effort to add support for OFFSET and FETCH and involves:
> - Implementing the execution strategy in `DataSetSort`
> - adapting the `DataSetSortRule` to support OFFSET and FETCH
> - extending the Table API and validation to support OFFSET and FETCH and 
> generate a corresponding RelNode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work

2016-08-02 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404120#comment-15404120
 ] 

Maximilian Michels commented on FLINK-4094:
---

Yes, it was undocumented. Initially, I thought preallocation:false would just 
lazily pool when, in fact, it actually returned memory. In the offheap case 
this memory was never freed because the garbage collection of offheap memory 
only kicks in when the direct memory size limit is reached (or when a full 
garbage collection occurs in the meantime). For the regular heap case, it 
actually frees memory during normal garbage collection cycles.

> Off heap memory deallocation might not properly work
> 
>
> Key: FLINK-4094
> URL: https://issues.apache.org/jira/browse/FLINK-4094
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.1.0
>
>
> A user reported that off-heap memory is not properly deallocated when setting 
> {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause 
> the TaskManager process being killed by the OS.
> It should be possible to execute multiple batch jobs with preallocation 
> turned off. No longer used direct memory buffers should be properly garbage 
> collected so that the JVM process does not exceed it's maximum memory bounds.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4203) Improve Table API documentation

2016-08-02 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4203.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 3f1e8b91574d66a8e9d8b74a6be1227535064405.

> Improve Table API documentation
> ---
>
> Key: FLINK-4203
> URL: https://issues.apache.org/jira/browse/FLINK-4203
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.2.0
>
>
> Some ideas:
> - Add a list of all supported scalar functions and a description
> - Add a more advanced example
> - Describe supported data types



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2293: [FLINK-4203] [table] [docs] Improve Table API docu...

2016-08-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2293


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4203) Improve Table API documentation

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404108#comment-15404108
 ] 

ASF GitHub Bot commented on FLINK-4203:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2293


> Improve Table API documentation
> ---
>
> Key: FLINK-4203
> URL: https://issues.apache.org/jira/browse/FLINK-4203
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Some ideas:
> - Add a list of all supported scalar functions and a description
> - Add a more advanced example
> - Describe supported data types



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4081) FieldParsers should support empty strings

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404107#comment-15404107
 ] 

ASF GitHub Bot commented on FLINK-4081:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2297
  
Using an error state for an empty string seems a bit unorthodox.
I take it returning `0` does not work for some reason?


> FieldParsers should support empty strings
> -
>
> Key: FLINK-4081
> URL: https://issues.apache.org/jira/browse/FLINK-4081
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Flavio Pompermaier
>Assignee: Timo Walther
>  Labels: csvparser, table-api
>
> In order to parse CSV files using the new Table API that converts rows to Row 
> objects (that support null values), FiledParser implementations should 
> support emptry strings setting the parser state to 
> ParseErrorState.EMPTY_STRING (for example FloatParser and DoubleParser 
> doesn't respect this constraint)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2297: [FLINK-4081] [core] [table] FieldParsers should support e...

2016-08-02 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2297
  
Using an error state for an empty string seems a bit unorthodox.
I take it returning `0` does not work for some reason?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4247) CsvTableSource.getDataSet() expects Java ExecutionEnvironment

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404102#comment-15404102
 ] 

ASF GitHub Bot commented on FLINK-4247:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2298
  
Would be great to reflect the "do not use" with proper visibility. A 
JavaDoc comment is usually not helping at all.

Also, the dependency on the java API classes in the Scala classes does not 
seem right. Usually the Scala classes hide the Java API streams and 
environments.

Both of these things are usually strong indicators that there are some 
abstraction issues.
Would be good to get that right, while the API is fresh and malleable.


> CsvTableSource.getDataSet() expects Java ExecutionEnvironment
> -
>
> Key: FLINK-4247
> URL: https://issues.apache.org/jira/browse/FLINK-4247
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Minor
>
> The Table API offers the {{CsvTableSource}} which can be used with the Java 
> and Scala API. However, if used with the Scala API where on has obtained a 
> {{scala.api.ExecutionEnvironment}} there is a problem with the 
> {{CsvTableSource.getDataSet}} method. The method expects a 
> {{java.api.ExecutionEnvironment}} to extract the underlying {{DataSet}}. 
> Additionally it returns a {{java.api.DataSet}} instead of a 
> {{scala.api.DataSet}}. I think we should also offer a Scala API specific 
> CsvTableSource which works with the respective Scala counterparts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2298: [FLINK-4247] [table] CsvTableSource.getDataSet() expects ...

2016-08-02 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2298
  
Would be great to reflect the "do not use" with proper visibility. A 
JavaDoc comment is usually not helping at all.

Also, the dependency on the java API classes in the Scala classes does not 
seem right. Usually the Scala classes hide the Java API streams and 
environments.

Both of these things are usually strong indicators that there are some 
abstraction issues.
Would be good to get that right, while the API is fresh and malleable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

2016-08-02 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2109
  
Just had a brief look. Looks nice. Actually, I like how you moved the file 
filter to the `FileInputFormat` instead of having it as a special case of the 
file monitoring source. 

I think you made at least one API breaking change which we will have to 
revert on `StreamExecutionEnvironment` after the next release (1.1.0). I think 
nobody will use `GlobFilePathFilter` unless it is documented. Could you revert 
the method changes on the environment and add some documentation?

I would keep the default filter as it is. A lot of users wouldn't expect 
the default filter to evaluate regular expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404097#comment-15404097
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2109
  
Just had a brief look. Looks nice. Actually, I like how you moved the file 
filter to the `FileInputFormat` instead of having it as a special case of the 
file monitoring source. 

I think you made at least one API breaking change which we will have to 
revert on `StreamExecutionEnvironment` after the next release (1.1.0). I think 
nobody will use `GlobFilePathFilter` unless it is documented. Could you revert 
the method changes on the environment and add some documentation?

I would keep the default filter as it is. A lot of users wouldn't expect 
the default filter to evaluate regular expressions.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4203) Improve Table API documentation

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404096#comment-15404096
 ] 

ASF GitHub Bot commented on FLINK-4203:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2293
  
I will merge this now. The doc is still not perfect but we can further 
improve it in the future.


> Improve Table API documentation
> ---
>
> Key: FLINK-4203
> URL: https://issues.apache.org/jira/browse/FLINK-4203
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Some ideas:
> - Add a list of all supported scalar functions and a description
> - Add a more advanced example
> - Describe supported data types



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2293: [FLINK-4203] [table] [docs] Improve Table API documentati...

2016-08-02 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2293
  
I will merge this now. The doc is still not perfect but we can further 
improve it in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4094) Off heap memory deallocation might not properly work

2016-08-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404095#comment-15404095
 ] 

Till Rohrmann commented on FLINK-4094:
--

Is {{preallocate == false}} really supposed to return the memory eventually? 
Given the documentation it simply says that memory is allocated lazily but 
nothing about releasing. Was this an undocumented behaviour?

> Off heap memory deallocation might not properly work
> 
>
> Key: FLINK-4094
> URL: https://issues.apache.org/jira/browse/FLINK-4094
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>Priority: Critical
> Fix For: 1.1.0
>
>
> A user reported that off-heap memory is not properly deallocated when setting 
> {{taskmanager.memory.preallocate:false}} (per default) [1]. This can cause 
> the TaskManager process being killed by the OS.
> It should be possible to execute multiple batch jobs with preallocation 
> turned off. No longer used direct memory buffers should be properly garbage 
> collected so that the JVM process does not exceed it's maximum memory bounds.
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/offheap-memory-allocation-and-memory-leak-bug-td12154.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4181) Add a basic streaming Table API example

2016-08-02 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4181.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 123c637e804bfdd6569051cf705ec73b5cb95352.

> Add a basic streaming Table API example
> ---
>
> Key: FLINK-4181
> URL: https://issues.apache.org/jira/browse/FLINK-4181
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>  Labels: starter
> Fix For: 1.2.0
>
>
> Although the Table API does not offer much streaming features yet, there 
> should be a runnable example showing how to convert, union, filter and 
> project streams with the Table API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4180) Create a batch SQL example

2016-08-02 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-4180.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed in 123c637e804bfdd6569051cf705ec73b5cb95352.

> Create a batch SQL example
> --
>
> Key: FLINK-4180
> URL: https://issues.apache.org/jira/browse/FLINK-4180
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>  Labels: starter
> Fix For: 1.2.0
>
>
> Currently there is no runnable code example in `flink-table` showing a 
> working batch SQL query with the Table API.
> A Scala and Java example should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4180) Create a batch SQL example

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404074#comment-15404074
 ] 

ASF GitHub Bot commented on FLINK-4180:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2274


> Create a batch SQL example
> --
>
> Key: FLINK-4180
> URL: https://issues.apache.org/jira/browse/FLINK-4180
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Jark Wu
>  Labels: starter
>
> Currently there is no runnable code example in `flink-table` showing a 
> working batch SQL query with the Table API.
> A Scala and Java example should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2274: [FLINK-4180] [FLINK-4181] [table] add Batch SQL an...

2016-08-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2274


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404060#comment-15404060
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162978
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -257,41 +257,22 @@ public void testFileInputSplit() {
public void testIgnoredUnderscoreFiles() {
try {
final String contents = "CONTENTS";
-   
+
// create some accepted, some ignored files
-   
-   File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-   File f;
-   do {
-   f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-   }
-   while (f.exists());
 
-   assertTrue(f.mkdirs());
-   f.deleteOnExit();
-   
-   File child1 = new File(f, "dataFile1.txt");
-   File child2 = new File(f, "another_file.bin");
-   File luigiFile = new File(f, "_luigi");
-   File success = new File(f, "_SUCCESS");
-   
-   File[] files = { child1, child2, luigiFile, success };
-   
-   for (File child : files) {
-   child.deleteOnExit();
-   
-   BufferedWriter out = new BufferedWriter(new 
FileWriter(child));
-   try { 
-   out.write(contents);
-   } finally {
-   out.close();
-   }
-   }
+   File tempDirectory = createTempDirectory();

+   File child1 = new File(tempDirectory, "dataFile1.txt");
+   File child2 = new File(tempDirectory, 
"another_file.bin");
+   File luigiFile = new File(tempDirectory, "_luigi");
+   File success = new File(tempDirectory, "_SUCCESS");
+
+   createTempFiles(contents.getBytes(), child1, child2, 
luigiFile, success);
--- End diff --

You could actually use `TemporaryFolder` here to create the root. It would 
be cleaned up automatically by junit.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162978
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -257,41 +257,22 @@ public void testFileInputSplit() {
public void testIgnoredUnderscoreFiles() {
try {
final String contents = "CONTENTS";
-   
+
// create some accepted, some ignored files
-   
-   File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-   File f;
-   do {
-   f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-   }
-   while (f.exists());
 
-   assertTrue(f.mkdirs());
-   f.deleteOnExit();
-   
-   File child1 = new File(f, "dataFile1.txt");
-   File child2 = new File(f, "another_file.bin");
-   File luigiFile = new File(f, "_luigi");
-   File success = new File(f, "_SUCCESS");
-   
-   File[] files = { child1, child2, luigiFile, success };
-   
-   for (File child : files) {
-   child.deleteOnExit();
-   
-   BufferedWriter out = new BufferedWriter(new 
FileWriter(child));
-   try { 
-   out.write(contents);
-   } finally {
-   out.close();
-   }
-   }
+   File tempDirectory = createTempDirectory();

+   File child1 = new File(tempDirectory, "dataFile1.txt");
+   File child2 = new File(tempDirectory, 
"another_file.bin");
+   File luigiFile = new File(tempDirectory, "_luigi");
+   File success = new File(tempDirectory, "_SUCCESS");
+
+   createTempFiles(contents.getBytes(), child1, child2, 
luigiFile, success);
--- End diff --

You could actually use `TemporaryFolder` here to create the root. It would 
be cleaned up automatically by junit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404050#comment-15404050
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162167
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
--- End diff --

What is "Glob"?


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162125
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -274,7 +272,7 @@ public int compare(Tuple2 
o1, Tuple2

[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404049#comment-15404049
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162125
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -274,7 +272,7 @@ public int compare(Tuple2 
o1, Tuple2 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162167
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
--- End diff --

What is "Glob"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404048#comment-15404048
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162004
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -274,7 +272,7 @@ public int compare(Tuple2 
o1, Tuple2 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162004
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -274,7 +272,7 @@ public int compare(Tuple2 
o1, Tuple2

[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404047#comment-15404047
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73161791
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -995,8 +993,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() 
{
public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

FileProcessingMode watchType,
-   
long interval,
-   
FilePathFilter filter) {
+   
long interval) {
--- End diff --

This is a breaking API change. `StreamExecutionEnvironment` is 
annotated`@Public`.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73161791
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -995,8 +993,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() 
{
public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

FileProcessingMode watchType,
-   
long interval,
-   
FilePathFilter filter) {
+   
long interval) {
--- End diff --

This is a breaking API change. `StreamExecutionEnvironment` is 
annotated`@Public`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404033#comment-15404033
 ] 

Stephan Ewen commented on FLINK-3298:
-

Another option we have been discussing a lot, is to have a separate projects 
for certain contributions, which makes the entry barrier much lower.
  - A separate {{flink-contrib}} repository where we have contributions that 
are not fully committed to
  - A separate {{flink-connectors}} repository where we could collect 
connectors until they have enough backing in the core committers to maintain 
them long term in the core flink repository

> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404021#comment-15404021
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2275#discussion_r73160331
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 ---
@@ -60,18 +64,33 @@
private static Properties standardProps;
private static ForkableFlinkMiniCluster flink;
 
+   @ClassRule
+   public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+   protected static Properties secureProps = new Properties();
+
@BeforeClass
public static void prepare() throws IOException, ClassNotFoundException 
{

LOG.info("-");
LOG.info("Starting KafkaShortRetentionTestBase ");

LOG.info("-");
 
+   Configuration flinkConfig = new Configuration();
+
// dynamically load the implementation for the test
Class clazz = 
Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
kafkaServer = (KafkaTestEnvironment) 
InstantiationUtil.instantiate(clazz);
 
LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
 
+   LOG.info("Runtype: {}", RunTypeHolder.get());
+   if(RunTypeHolder.get().equals(RunTypeHolder.RunType.SECURE)
+   && kafkaServer.isSecureRunSupported()) {
+   SecureTestEnvironment.prepare(tempFolder);
+   
SecureTestEnvironment.getSecurityEnabledFlinkConfiguration(flinkConfig);
--- End diff --

The name of the method doesn't suggest that though.

`SecureTestEnvironment.writeSecurityConfiguration(flinkConfig);` would make 
it more explicit.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404003#comment-15404003
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2275#discussion_r73159225
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -1016,6 +1016,23 @@
/** The environment variable name which contains the location of the 
lib folder */
public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
 
+   //  Security 
---
+
+   /**
+* The config parameter defining security credentials required
+* for securing Flink cluster.
+*/
+
+   /** Keytab file key name to be used in flink configuration file */
+   public static final String SECURITY_KEYTAB_KEY = "security.keytab";
+
+   /** Kerberos security principal key name to be used in flink 
configuration file */
+   public static final String SECURITY_PRINCIPAL_KEY = 
"security.principal";
+
+   /** Keytab file name populated in YARN container */
+   public static final String KEYTAB_FILE_NAME = "krb5.keytab";
--- End diff --

My point is that this class is declared `@Public` and we will have to stick 
with this config key name. It could also reside in the `flink-yarn` module 
because it doesn't have to be exposed to the user.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >