[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig

2017-09-22 Thread Raymond Tay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177433#comment-16177433
 ] 

Raymond Tay commented on FLINK-7669:


The fatjar was initially built against 1.4-SNAPSHOT and when it gave those 
error messages, i d/l 1.3.2 and used the same fatjar to run against 1.3.2 and 
it was alright. I suspect some commit in the last 2 days caused it.

> org.apache.flink.api.common.ExecutionConfig cannot be cast to 
> org.apache.flink.api.common.ExecutionConfig
> -
>
> Key: FLINK-7669
> URL: https://issues.apache.org/jira/browse/FLINK-7669
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
> Environment: - OS: macOS Sierra 
> - Oracle JDK 1.8
> - Scala 2.11.11
> - sbt 0.13.16
> - Build from trunk code at commit hash 
> {{42cc3a2a9c41dda7cf338db36b45131db9150674}}
> -- started a local flink node 
>Reporter: Raymond Tay
>
> Latest code pulled from trunk threw errors at runtime when i ran a job 
> against it; but when i ran the JAR against the stable version {{1.3.2}} it 
> was OK. Here is the stacktrace. 
> An exception is being thrown :
> {noformat}
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of 
> programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22#
> Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader 
> session id ----.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 
> (Flink Streaming Job)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
>   at 
> org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53)
>   at 
> org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
> submit job 05dd8e60c6fda3b96fc22ef6cf389a23 (Flink Streaming Job)
>   at 
> 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177387#comment-16177387
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620313
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177386#comment-16177386
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620296
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620313
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177385#comment-16177385
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620261
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
--- End diff --

Very true. Thanks.


> Better documentation and examples on C* sink usage for Pojo and Tuples data 
> types
> -
>
> Key: FLINK-7632
> URL: https://issues.apache.org/jira/browse/FLINK-7632
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cassandra Connector, Documentation
>Reporter: Michael Fong
>Assignee: Michael Fong
>
> Cassandra sink supports Pojo and Java Tuple data types. We should improve 
> documentation on its usage as well as some concrete / meaningful examples for 
> both cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620261
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
--- End diff --

Very true. Thanks.


---


[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177384#comment-16177384
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620229
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

--- End diff --

Good eye. Thanks


> Better documentation and examples on C* sink usage for Pojo and Tuples data 
> types
> -
>
> Key: FLINK-7632
> URL: https://issues.apache.org/jira/browse/FLINK-7632
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cassandra Connector, Documentation
>Reporter: Michael Fong
>Assignee: Michael Fong
>
> Cassandra sink supports Pojo and Java Tuple data types. We should improve 
> documentation on its usage as well as some concrete / meaningful examples for 
> both cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620296
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177383#comment-16177383
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620211
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620211
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread mcfongtw
Github user mcfongtw commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140620229
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

--- End diff --

Good eye. Thanks


---


[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177372#comment-16177372
 ] 

ASF GitHub Bot commented on FLINK-7358:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4534
  
@fhueske Because the 11.11, I have not update this PR in time. sorry about 
that. Now, I have updated the PR. and PR description. Please look at the PR. :)

Thanks, jincheng


> Add  implicitly converts support for User-defined function
> --
>
> Key: FLINK-7358
> URL: https://issues.apache.org/jira/browse/FLINK-7358
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently if user defined a UDF as follows:
> {code}
> object Func extends ScalarFunction {
>   def eval(a: Int, b: Long): String = {
> ...
>   }
> }
> {code}
> And if the table schema is (a: Int, b: int, c: String), then we can not call 
> the UDF `Func('a, 'b)`. So
> I want add implicitly converts when we call UDF. The implicitly convert rule 
> is:
> BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> 
> FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO
> *Note:
> In this JIRA. only for TableAPI, And SQL will be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-1908.*
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4534: [FLINK-7358][table]Add implicitly converts support for Us...

2017-09-22 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4534
  
@fhueske Because the 11.11, I have not update this PR in time. sorry about 
that. Now, I have updated the PR. and PR description. Please look at the PR. :)

Thanks, jincheng


---


[GitHub] flink issue #4701: [FLINK-7664] Replace FlinkFutureException by java.util.co...

2017-09-22 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4701
  
LGTM


---


[jira] [Commented] (FLINK-7664) Replace FlinkFutureException by CompletionException

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177319#comment-16177319
 ] 

ASF GitHub Bot commented on FLINK-7664:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4701
  
LGTM


> Replace FlinkFutureException by CompletionException
> ---
>
> Key: FLINK-7664
> URL: https://issues.apache.org/jira/browse/FLINK-7664
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{FlinkFutureException}} was introduced to fail in a 
> {{CompletableFuture}} callback. This can, however, better be done via the 
> {{CompletionException}}. Therefore, we should remove the 
> {{FlinkFutureException}} and replace it with the {{CompletionException}} 
> instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7673) Flink Kinesis connector - remove dependency on the ASL code

2017-09-22 Thread Radoslaw Gruchalski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Radoslaw Gruchalski updated FLINK-7673:
---
Description: 
Currently the Flink Kinesis connector depends on two artifacts which are 
available under Amazon Software License code. The artifacts are:

{noformat}
   
   com.amazonaws
   amazon-kinesis-producer
   ${aws.kinesis-kpl.version}
   
   
   com.amazonaws
   amazon-kinesis-client
   ${aws.kinesis-kcl.version}
   
   
   
   com.amazonaws
   
aws-java-sdk-dynamodb
   
   
   com.amazonaws
   
aws-java-sdk-cloudwatch
   
   
   
{noformat}

This prevents the connector being published to Maven Central. I would like to 
contribute to Flink Kinesis connector by replacing the ASL licensed code with 
an implementation using AWS SDK for Java. For that, I'd like to understand two 
things:

- what's the correct process for making the contribution (contributor 
agreement, etc.)
- what's the minimum functionality the alternative implementation would have to 
provide in order to get accepted

There are three classes in the connector's source requiring modification:

- org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer
- org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil
- org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer

Thank you.

  was:
Currently the Flink Kinesis connector depends on two artifacts which are 
available under Amazon Software License code. The artifacts are:

{noformat}
   
   com.amazonaws
   amazon-kinesis-producer
   ${aws.kinesis-kpl.version}
   
   
   com.amazonaws
   amazon-kinesis-client
   ${aws.kinesis-kcl.version}
   
   
   
   com.amazonaws
   
aws-java-sdk-dynamodb
   
   
   com.amazonaws
   
aws-java-sdk-cloudwatch
   
   
   
{noformat}

This prevents the connector being published to Maven Central. I would like to 
contribute to Flink Kinesis connector by replacing the ASL licensed code with 
an implementation using AWS SDK for Java. For that, I'd like to understand two 
things:

- what's the correct process for making the contribution (contributor 
agreement, etc.)
- what's the minimum functionality the alternative implementation would have to 
provide in order to get accepted

Thank you.


> Flink Kinesis connector - remove dependency on the ASL code
> ---
>
> Key: FLINK-7673
> URL: https://issues.apache.org/jira/browse/FLINK-7673
> Project: Flink
>  Issue Type: Improvement
>Reporter: Radoslaw Gruchalski
>
> Currently the Flink Kinesis connector depends on two artifacts which are 
> available under Amazon Software License code. The artifacts are:
> {noformat}
>
>com.amazonaws
>amazon-kinesis-producer
>${aws.kinesis-kpl.version}
>
>
>com.amazonaws
>amazon-kinesis-client
>${aws.kinesis-kcl.version}
>
>
>
>com.amazonaws
>
> aws-java-sdk-dynamodb
>
>
>com.amazonaws
>
> aws-java-sdk-cloudwatch
>
>
>
> {noformat}
> This prevents the connector being published to Maven Central. I would like to 
> contribute to Flink Kinesis connector by replacing the ASL licensed code with 
> an implementation using AWS SDK for Java. For that, I'd like to understand 
> two things:
> - what's the correct process for making the contribution (contributor 
> agreement, etc.)
> - what's the minimum functionality 

[jira] [Commented] (FLINK-7664) Replace FlinkFutureException by CompletionException

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177318#comment-16177318
 ] 

ASF GitHub Bot commented on FLINK-7664:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4701#discussion_r140614369
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 ---
@@ -65,7 +65,7 @@ public JarPlanHandler(Executor executor, File 
jarDirectory) {
return writer.toString();
}
catch (Exception e) {
-   throw new FlinkFutureException(e);
+   throw new CompletionException(e);
--- End diff --

Why it's not wrapping a `FlinkException` here?


> Replace FlinkFutureException by CompletionException
> ---
>
> Key: FLINK-7664
> URL: https://issues.apache.org/jira/browse/FLINK-7664
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{FlinkFutureException}} was introduced to fail in a 
> {{CompletableFuture}} callback. This can, however, better be done via the 
> {{CompletionException}}. Therefore, we should remove the 
> {{FlinkFutureException}} and replace it with the {{CompletionException}} 
> instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4701: [FLINK-7664] Replace FlinkFutureException by java....

2017-09-22 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4701#discussion_r140614369
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 ---
@@ -65,7 +65,7 @@ public JarPlanHandler(Executor executor, File 
jarDirectory) {
return writer.toString();
}
catch (Exception e) {
-   throw new FlinkFutureException(e);
+   throw new CompletionException(e);
--- End diff --

Why it's not wrapping a `FlinkException` here?


---


[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?

2017-09-22 Thread Radoslaw Gruchalski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177315#comment-16177315
 ] 

Radoslaw Gruchalski commented on FLINK-7672:


https://issues.apache.org/jira/browse/FLINK-7673

> Publish Kinesis connector to Maven Central?
> ---
>
> Key: FLINK-7672
> URL: https://issues.apache.org/jira/browse/FLINK-7672
> Project: Flink
>  Issue Type: Improvement
>Reporter: Radoslaw Gruchalski
>
> Hi there, just started working with Flink, second time in the last couple of 
> years. Second time I hit a major roadblock - the connector I try to work with 
> is not available off the shelf. I wonder, what's the reason for this 
> connector: 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml
>  not being Maven Central? It's 21st century, why should I install the jar 
> manually on every machine where I need to work with the code? Off the top of 
> my head, my local box, jenkins, customer's deployment. Really?
> I do understand this is coming across rather rude, but come on. There's 
> awesome org.apache.flink organization on Central, so why not every connector 
> in there?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7673) Flink Kinesis connector - remove dependency on the ASL code

2017-09-22 Thread Radoslaw Gruchalski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Radoslaw Gruchalski updated FLINK-7673:
---
Description: 
Currently the Flink Kinesis connector depends on two artifacts which are 
available under Amazon Software License code. The artifacts are:

{noformat}
   
   com.amazonaws
   amazon-kinesis-producer
   ${aws.kinesis-kpl.version}
   
   
   com.amazonaws
   amazon-kinesis-client
   ${aws.kinesis-kcl.version}
   
   
   
   com.amazonaws
   
aws-java-sdk-dynamodb
   
   
   com.amazonaws
   
aws-java-sdk-cloudwatch
   
   
   
{noformat}

This prevents the connector being published to Maven Central. I would like to 
contribute to Flink Kinesis connector by replacing the ASL licensed code with 
an implementation using AWS SDK for Java. For that, I'd like to understand two 
things:

- what's the correct process for making the contribution (contributor 
agreement, etc.)
- what's the minimum functionality the alternative implementation would have to 
provide in order to get accepted

Thank you.

  was:
Currently the Flink Kinesis connector depends on two artifacts which are 
available under Amazon Software License code. The artifacts are:

{noformat}
   
   com.amazonaws
   amazon-kinesis-producer
   ${aws.kinesis-kpl.version}
   
   
   com.amazonaws
   amazon-kinesis-client
   ${aws.kinesis-kcl.version}
   
   
   
   com.amazonaws
   
aws-java-sdk-dynamodb
   
   
   com.amazonaws
   
aws-java-sdk-cloudwatch
   
   
   
{/noformat}

This prevents the connector being published to Maven Central. I would like to 
contribute to Flink Kinesis connector by replacing the ASL licensed code with 
an implementation using AWS SDK for Java. For that, I'd like to understand two 
things:

- what's the correct process for making the contribution (contributor 
agreement, etc.)
- what's the minimum functionality the alternative implementation would have to 
provide in order to get accepted

Thank you.


> Flink Kinesis connector - remove dependency on the ASL code
> ---
>
> Key: FLINK-7673
> URL: https://issues.apache.org/jira/browse/FLINK-7673
> Project: Flink
>  Issue Type: Improvement
>Reporter: Radoslaw Gruchalski
>
> Currently the Flink Kinesis connector depends on two artifacts which are 
> available under Amazon Software License code. The artifacts are:
> {noformat}
>
>com.amazonaws
>amazon-kinesis-producer
>${aws.kinesis-kpl.version}
>
>
>com.amazonaws
>amazon-kinesis-client
>${aws.kinesis-kcl.version}
>
>
>
>com.amazonaws
>
> aws-java-sdk-dynamodb
>
>
>com.amazonaws
>
> aws-java-sdk-cloudwatch
>
>
>
> {noformat}
> This prevents the connector being published to Maven Central. I would like to 
> contribute to Flink Kinesis connector by replacing the ASL licensed code with 
> an implementation using AWS SDK for Java. For that, I'd like to understand 
> two things:
> - what's the correct process for making the contribution (contributor 
> agreement, etc.)
> - what's the minimum functionality the alternative implementation would have 
> to provide in order to get accepted
> Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7673) Flink Kinesis connector - remove dependency on the ASL code

2017-09-22 Thread Radoslaw Gruchalski (JIRA)
Radoslaw Gruchalski created FLINK-7673:
--

 Summary: Flink Kinesis connector - remove dependency on the ASL 
code
 Key: FLINK-7673
 URL: https://issues.apache.org/jira/browse/FLINK-7673
 Project: Flink
  Issue Type: Improvement
Reporter: Radoslaw Gruchalski


Currently the Flink Kinesis connector depends on two artifacts which are 
available under Amazon Software License code. The artifacts are:

{noformat}
   
   com.amazonaws
   amazon-kinesis-producer
   ${aws.kinesis-kpl.version}
   
   
   com.amazonaws
   amazon-kinesis-client
   ${aws.kinesis-kcl.version}
   
   
   
   com.amazonaws
   
aws-java-sdk-dynamodb
   
   
   com.amazonaws
   
aws-java-sdk-cloudwatch
   
   
   
{/noformat}

This prevents the connector being published to Maven Central. I would like to 
contribute to Flink Kinesis connector by replacing the ASL licensed code with 
an implementation using AWS SDK for Java. For that, I'd like to understand two 
things:

- what's the correct process for making the contribution (contributor 
agreement, etc.)
- what's the minimum functionality the alternative implementation would have to 
provide in order to get accepted

Thank you.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7393) Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177311#comment-16177311
 ] 

ASF GitHub Bot commented on FLINK-7393:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4708

[FLINK-7393][kinesis connector] Move unit tests that should belong to 
KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest

## What is the purpose of the change

Right now, `FlinkKinesisConsumerTest` has lots of unit tests that actually 
should belong to `KinesisConfigUtil`, e.g. all the `validateXxxConfiguration()`

We need to move those tests out to a new file `KinesisConfigUtilTest`

## Brief change log

- Move unit tests that should belong to KinesisConfigUtil from 
FlinkKinesisConsumerTest to KinesisConfigUtilTest


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7223

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4708.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4708


commit 8ba44053ea96220fd74a357ff612bebf65553cb7
Author: Bowen Li 
Date:   2017-09-22T23:10:56Z

FLINK-7393 Move unit tests of KinesisConfigUtil from 
FlinkKinesisConsumerTest to KinesisConfigUtilTest




> Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to 
> KinesisConfigUtilTest
> ---
>
> Key: FLINK-7393
> URL: https://issues.apache.org/jira/browse/FLINK-7393
> Project: Flink
>  Issue Type: Test
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> Right now, 
> [{{FlinkKinesisConsumerTest}}|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java]
>  has lots of tests that actually should belong to {{KinesisConfigUtil}}, e.g. 
> all the {{validateXxxConfiguration()}}
> We need to move those tests out to a new file {{KinesisConfigUtilTest}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4708: [FLINK-7393][kinesis connector] Move unit tests th...

2017-09-22 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4708

[FLINK-7393][kinesis connector] Move unit tests that should belong to 
KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest

## What is the purpose of the change

Right now, `FlinkKinesisConsumerTest` has lots of unit tests that actually 
should belong to `KinesisConfigUtil`, e.g. all the `validateXxxConfiguration()`

We need to move those tests out to a new file `KinesisConfigUtilTest`

## Brief change log

- Move unit tests that should belong to KinesisConfigUtil from 
FlinkKinesisConsumerTest to KinesisConfigUtilTest


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-7223

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4708.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4708


commit 8ba44053ea96220fd74a357ff612bebf65553cb7
Author: Bowen Li 
Date:   2017-09-22T23:10:56Z

FLINK-7393 Move unit tests of KinesisConfigUtil from 
FlinkKinesisConsumerTest to KinesisConfigUtilTest




---


[jira] [Updated] (FLINK-7393) Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest

2017-09-22 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7393:

Summary: Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest 
to KinesisConfigUtilTest  (was: Move  FlinkKinesisConsumerTest)

> Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to 
> KinesisConfigUtilTest
> ---
>
> Key: FLINK-7393
> URL: https://issues.apache.org/jira/browse/FLINK-7393
> Project: Flink
>  Issue Type: Test
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> Right now, 
> [{{FlinkKinesisConsumerTest}}|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java]
>  has lots of tests that actually should belong to {{KinesisConfigUtil}}, e.g. 
> all the {{validateXxxConfiguration()}}
> We need to move those tests out to a new file {{KinesisConfigUtilTest}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7393) Move FlinkKinesisConsumerTest

2017-09-22 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7393:

Summary: Move  FlinkKinesisConsumerTest  (was: Refactor 
FlinkKinesisConsumerTest)

> Move  FlinkKinesisConsumerTest
> --
>
> Key: FLINK-7393
> URL: https://issues.apache.org/jira/browse/FLINK-7393
> Project: Flink
>  Issue Type: Test
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> Right now, 
> [{{FlinkKinesisConsumerTest}}|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java]
>  has lots of tests that actually should belong to {{KinesisConfigUtil}}, e.g. 
> all the {{validateXxxConfiguration()}}
> We need to move those tests out to a new file {{KinesisConfigUtilTest}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?

2017-09-22 Thread Radoslaw Gruchalski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177247#comment-16177247
 ] 

Radoslaw Gruchalski commented on FLINK-7672:


I see, the problem is here: https://github.com/awslabs/amazon-kinesis-client 
and here: https://github.com/awslabs/amazon-kinesis-producer. I guess the 
correct approach would be to not use these two artifacts in the Kinesis 
connector.

> Publish Kinesis connector to Maven Central?
> ---
>
> Key: FLINK-7672
> URL: https://issues.apache.org/jira/browse/FLINK-7672
> Project: Flink
>  Issue Type: Improvement
>Reporter: Radoslaw Gruchalski
>
> Hi there, just started working with Flink, second time in the last couple of 
> years. Second time I hit a major roadblock - the connector I try to work with 
> is not available off the shelf. I wonder, what's the reason for this 
> connector: 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml
>  not being Maven Central? It's 21st century, why should I install the jar 
> manually on every machine where I need to work with the code? Off the top of 
> my head, my local box, jenkins, customer's deployment. Really?
> I do understand this is coming across rather rude, but come on. There's 
> awesome org.apache.flink organization on Central, so why not every connector 
> in there?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?

2017-09-22 Thread Radoslaw Gruchalski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177231#comment-16177231
 ] 

Radoslaw Gruchalski commented on FLINK-7672:


But the AWS SDK for Java is licensed under Apache 2.0 license:
- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk
- https://github.com/aws/aws-sdk-java/blob/master/LICENSE.txt

> Publish Kinesis connector to Maven Central?
> ---
>
> Key: FLINK-7672
> URL: https://issues.apache.org/jira/browse/FLINK-7672
> Project: Flink
>  Issue Type: Improvement
>Reporter: Radoslaw Gruchalski
>
> Hi there, just started working with Flink, second time in the last couple of 
> years. Second time I hit a major roadblock - the connector I try to work with 
> is not available off the shelf. I wonder, what's the reason for this 
> connector: 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml
>  not being Maven Central? It's 21st century, why should I install the jar 
> manually on every machine where I need to work with the code? Off the top of 
> my head, my local box, jenkins, customer's deployment. Really?
> I do understand this is coming across rather rude, but come on. There's 
> awesome org.apache.flink organization on Central, so why not every connector 
> in there?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster

2017-09-22 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7656.

   Resolution: Fixed
Fix Version/s: 1.3.3
   1.4.0

Fixed for 1.3.3 with 9a8b515c03d21023df4939af02a9ae87c9439284
Fixed for 1.4.0 with 4afca4b3a13b61c2754bc839c77ba4d4eb1d2da2

> Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
> 
>
> Key: FLINK-7656
> URL: https://issues.apache.org/jira/browse/FLINK-7656
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
> Fix For: 1.4.0, 1.3.3
>
>
> The contract that Flink provides to usercode is that that the usercode 
> classloader is the context classloader whenever usercode is called.
> In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} 
> and {{finalizeOnMaster()}} methods but the context classloader is not set to 
> the usercode classloader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177217#comment-16177217
 ] 

ASF GitHub Bot commented on FLINK-7656:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4690


> Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
> 
>
> Key: FLINK-7656
> URL: https://issues.apache.org/jira/browse/FLINK-7656
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> The contract that Flink provides to usercode is that that the usercode 
> classloader is the context classloader whenever usercode is called.
> In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} 
> and {{finalizeOnMaster()}} methods but the context classloader is not set to 
> the usercode classloader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4690: [FLINK-7656] [runtime] Switch to user classloader ...

2017-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4690


---


[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?

2017-09-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177215#comment-16177215
 ] 

Fabian Hueske commented on FLINK-7672:
--

AFAIK, calling the API might be considered as linking but I'm not a layer. 
Have you seen other Apache projects publishing Kinesis connector artifacts?

> Publish Kinesis connector to Maven Central?
> ---
>
> Key: FLINK-7672
> URL: https://issues.apache.org/jira/browse/FLINK-7672
> Project: Flink
>  Issue Type: Improvement
>Reporter: Radoslaw Gruchalski
>
> Hi there, just started working with Flink, second time in the last couple of 
> years. Second time I hit a major roadblock - the connector I try to work with 
> is not available off the shelf. I wonder, what's the reason for this 
> connector: 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml
>  not being Maven Central? It's 21st century, why should I install the jar 
> manually on every machine where I need to work with the code? Off the top of 
> my head, my local box, jenkins, customer's deployment. Really?
> I do understand this is coming across rather rude, but come on. There's 
> awesome org.apache.flink organization on Central, so why not every connector 
> in there?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?

2017-09-22 Thread Radoslaw Gruchalski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177188#comment-16177188
 ] 

Radoslaw Gruchalski commented on FLINK-7672:


I'm not sure, the artifact uses the sdk but does not include it, thus it does 
not distribute it.

> Publish Kinesis connector to Maven Central?
> ---
>
> Key: FLINK-7672
> URL: https://issues.apache.org/jira/browse/FLINK-7672
> Project: Flink
>  Issue Type: Improvement
>Reporter: Radoslaw Gruchalski
>
> Hi there, just started working with Flink, second time in the last couple of 
> years. Second time I hit a major roadblock - the connector I try to work with 
> is not available off the shelf. I wonder, what's the reason for this 
> connector: 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml
>  not being Maven Central? It's 21st century, why should I install the jar 
> manually on every machine where I need to work with the code? Off the top of 
> my head, my local box, jenkins, customer's deployment. Really?
> I do understand this is coming across rather rude, but come on. There's 
> awesome org.apache.flink organization on Central, so why not every connector 
> in there?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7672) Publish Kinesis connector to Maven Central?

2017-09-22 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske resolved FLINK-7672.
--
Resolution: Unresolved

Unfortunately, Amazon Software License is Cat X 
(https://www.apache.org/legal/resolved.html#category-x) and cannot be part of 
distributed artifact.

I know this is annoying but this cannot be resolved from our side.

> Publish Kinesis connector to Maven Central?
> ---
>
> Key: FLINK-7672
> URL: https://issues.apache.org/jira/browse/FLINK-7672
> Project: Flink
>  Issue Type: Improvement
>Reporter: Radoslaw Gruchalski
>
> Hi there, just started working with Flink, second time in the last couple of 
> years. Second time I hit a major roadblock - the connector I try to work with 
> is not available off the shelf. I wonder, what's the reason for this 
> connector: 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml
>  not being Maven Central? It's 21st century, why should I install the jar 
> manually on every machine where I need to work with the code? Off the top of 
> my head, my local box, jenkins, customer's deployment. Really?
> I do understand this is coming across rather rude, but come on. There's 
> awesome org.apache.flink organization on Central, so why not every connector 
> in there?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?

2017-09-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177172#comment-16177172
 ] 

Fabian Hueske commented on FLINK-7672:
--

The Kinesis connector is not published to Maven Central because of a licensing 
issue with the Amazon Software License.
To quote the documentation:

bq. The flink-connector-kinesis_2.10 has a dependency on code licensed under 
the Amazon Software License (ASL). Linking to the flink-connector-kinesis will 
include ASL licensed code into your application. 
bq. The flink-connector-kinesis_2.10 artifact is not deployed to Maven central 
as part of Flink releases because of the licensing issue. Therefore, you need 
to build the connector yourself from the source.

> Publish Kinesis connector to Maven Central?
> ---
>
> Key: FLINK-7672
> URL: https://issues.apache.org/jira/browse/FLINK-7672
> Project: Flink
>  Issue Type: Improvement
>Reporter: Radoslaw Gruchalski
>
> Hi there, just started working with Flink, second time in the last couple of 
> years. Second time I hit a major roadblock - the connector I try to work with 
> is not available off the shelf. I wonder, what's the reason for this 
> connector: 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml
>  not being Maven Central? It's 21st century, why should I install the jar 
> manually on every machine where I need to work with the code? Off the top of 
> my head, my local box, jenkins, customer's deployment. Really?
> I do understand this is coming across rather rude, but come on. There's 
> awesome org.apache.flink organization on Central, so why not every connector 
> in there?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7672) Publish Kinesis connector to Maven Central?

2017-09-22 Thread Radoslaw Gruchalski (JIRA)
Radoslaw Gruchalski created FLINK-7672:
--

 Summary: Publish Kinesis connector to Maven Central?
 Key: FLINK-7672
 URL: https://issues.apache.org/jira/browse/FLINK-7672
 Project: Flink
  Issue Type: Improvement
Reporter: Radoslaw Gruchalski


Hi there, just started working with Flink, second time in the last couple of 
years. Second time I hit a major roadblock - the connector I try to work with 
is not available off the shelf. I wonder, what's the reason for this connector: 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml
 not being Maven Central? It's 21st century, why should I install the jar 
manually on every machine where I need to work with the code? Off the top of my 
head, my local box, jenkins, customer's deployment. Really?
I do understand this is coming across rather rude, but come on. There's awesome 
org.apache.flink organization on Central, so why not every connector in there?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7671) JobManagerCleanupITCase failed in build

2017-09-22 Thread Bowen Li (JIRA)
Bowen Li created FLINK-7671:
---

 Summary: JobManagerCleanupITCase failed in build
 Key: FLINK-7671
 URL: https://issues.apache.org/jira/browse/FLINK-7671
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.4.0
Reporter: Bowen Li
 Fix For: 1.4.0



{code:java}
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase
testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase)
  Time elapsed: 0.27 sec  <<< FAILURE!
java.lang.AssertionError: assertion failed: expected interface 
org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, 
found class 
org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure
at scala.Predef$.assert(Predef.scala:179)
at 
akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424)
at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410)
at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718)
at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.run(JobManagerCleanupITCase.java:224)
at akka.testkit.JavaTestKit$Within$1.apply(JavaTestKit.java:232)
at akka.testkit.TestKitBase$class.within(TestKit.scala:296)
at akka.testkit.TestKit.within(TestKit.scala:718)
at akka.testkit.TestKitBase$class.within(TestKit.scala:310)
at akka.testkit.TestKit.within(TestKit.scala:718)
at akka.testkit.JavaTestKit$Within.(JavaTestKit.java:230)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.(JobManagerCleanupITCase.java:134)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1.(JobManagerCleanupITCase.java:134)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanup(JobManagerCleanupITCase.java:133)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanupCancelledJob(JobManagerCleanupITCase.java:108)
{code}

in https://travis-ci.org/apache/flink/jobs/278740892




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176945#comment-16176945
 ] 

ASF GitHub Bot commented on FLINK-7635:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4702
  
Thank you @aljoscha ! 

I moved unrelated code changes to PR 
https://github.com/apache/flink/pull/4707

Just curious that why Flink's Scala API has its own `OutputTag` class, 
rather than using `org.apache.flink.util.OutputTag`?


> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4702: [FLINK-7635][DataStream API][Scala API] Support sideOutpu...

2017-09-22 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4702
  
Thank you @aljoscha ! 

I moved unrelated code changes to PR 
https://github.com/apache/flink/pull/4707

Just curious that why Flink's Scala API has its own `OutputTag` class, 
rather than using `org.apache.flink.util.OutputTag`?


---


[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176933#comment-16176933
 ] 

ASF GitHub Bot commented on FLINK-7635:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4702#discussion_r140571380
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -774,6 +774,14 @@ public KeyedStateStore windowState() {
public KeyedStateStore globalState() {
return WindowOperator.this.getKeyedStateStore();
}
+
+   public  void output(OutputTag outputTag, X value) {
+   if (outputTag == null) {
+   throw new IllegalArgumentException("OutputTag 
must not be null.");
+   }
+   output.collect(outputTag, new StreamRecord<>(value, 
cleanupTime(window)));
--- End diff --

Make sense. Fixed


> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...

2017-09-22 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4702#discussion_r140571380
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -774,6 +774,14 @@ public KeyedStateStore windowState() {
public KeyedStateStore globalState() {
return WindowOperator.this.getKeyedStateStore();
}
+
+   public  void output(OutputTag outputTag, X value) {
+   if (outputTag == null) {
+   throw new IllegalArgumentException("OutputTag 
must not be null.");
+   }
+   output.collect(outputTag, new StreamRecord<>(value, 
cleanupTime(window)));
--- End diff --

Make sense. Fixed


---


[GitHub] flink pull request #4707: [hotfix] Add doc for window operators, fix typos a...

2017-09-22 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4707

[hotfix] Add doc for window operators, fix typos and format code

## What is the purpose of the change

Add doc for window operators, fix typos and format code. This is separated 
from https://github.com/apache/flink/pull/4702 so that PR can contain only 
feature related changes

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4707.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4707


commit fda53bdecfdc203bad009ceca5e3e8123ef3d525
Author: Bowen Li 
Date:   2017-09-22T18:36:18Z

[hotfix] Fix typos and other small things




---


[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...

2017-09-22 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4665
  
@aljoscha sorry for being late the this CR. I left a couple comments


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-22 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r140564400
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 ---
@@ -231,6 +231,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
--- End diff --

the if-else conditions are duplicated and inefficient, and can be further 
combined as

if(isSkippedElement && isElementLate(element)) {
if(lateDataOutputTag != null) {
sideOutput(element);
}
this.lostDataCount.inc();
}


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-22 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r140564281
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
--- End diff --

the if-else conditions are duplicated and inefficient, and can be further 
combined as

```java
if(isSkippedElement && isElementLate(element)) {
if(lateDataOutputTag != null) {
sideOutput(element);
}
this.lostDataCount.inc();
}
```


---


[jira] [Commented] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176659#comment-16176659
 ] 

ASF GitHub Bot commented on FLINK-7656:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4690
  
Thanks for the review @aljoscha.
I'll change that and merge the fix.


> Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
> 
>
> Key: FLINK-7656
> URL: https://issues.apache.org/jira/browse/FLINK-7656
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> The contract that Flink provides to usercode is that that the usercode 
> classloader is the context classloader whenever usercode is called.
> In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} 
> and {{finalizeOnMaster()}} methods but the context classloader is not set to 
> the usercode classloader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4690: [FLINK-7656] [runtime] Switch to user classloader before ...

2017-09-22 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4690
  
Thanks for the review @aljoscha.
I'll change that and merge the fix.


---


[jira] [Commented] (FLINK-7606) CEP operator leaks state

2017-09-22 Thread Paolo Rendano (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176654#comment-16176654
 ] 

Paolo Rendano commented on FLINK-7606:
--

Hi [~kkl0u],
sorry this week was hard also for me. Next week for sure i'll have some time to 
check your suggestions and make the trial. Regarding my open issue FLINK-7606, 
I see also [~matteoferrario29] is losing events in CEP. I'll go deeper next 
week.



> CEP operator leaks state
> 
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1
>Reporter: Matteo Ferrario
> Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png
>
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176606#comment-16176606
 ] 

ASF GitHub Bot commented on FLINK-7635:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4702#discussion_r140529325
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -774,6 +774,14 @@ public KeyedStateStore windowState() {
public KeyedStateStore globalState() {
return WindowOperator.this.getKeyedStateStore();
}
+
+   public  void output(OutputTag outputTag, X value) {
+   if (outputTag == null) {
+   throw new IllegalArgumentException("OutputTag 
must not be null.");
+   }
+   output.collect(outputTag, new StreamRecord<>(value, 
cleanupTime(window)));
--- End diff --

I think this should be `window.maxTimestamp()` to match 
`emitWindowContents()`.


> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176605#comment-16176605
 ] 

ASF GitHub Bot commented on FLINK-7635:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4702#discussion_r140528890
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 ---
@@ -212,6 +213,11 @@ public KeyedStateStore windowState() {
public KeyedStateStore globalState() {
return globalState;
}
+
+   @Override
+   public  void output(OutputTag outputTag, X value) {
+   throw new UnsupportedOperationException("current 
watermark is not supported in this context");
--- End diff --

Still has the old message about watermarks.


> Support sideOutput in ProcessWindowFunciton
> ---
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...

2017-09-22 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4702#discussion_r140529325
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -774,6 +774,14 @@ public KeyedStateStore windowState() {
public KeyedStateStore globalState() {
return WindowOperator.this.getKeyedStateStore();
}
+
+   public  void output(OutputTag outputTag, X value) {
+   if (outputTag == null) {
+   throw new IllegalArgumentException("OutputTag 
must not be null.");
+   }
+   output.collect(outputTag, new StreamRecord<>(value, 
cleanupTime(window)));
--- End diff --

I think this should be `window.maxTimestamp()` to match 
`emitWindowContents()`.


---


[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...

2017-09-22 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4702#discussion_r140528890
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
 ---
@@ -212,6 +213,11 @@ public KeyedStateStore windowState() {
public KeyedStateStore globalState() {
return globalState;
}
+
+   @Override
+   public  void output(OutputTag outputTag, X value) {
+   throw new UnsupportedOperationException("current 
watermark is not supported in this context");
--- End diff --

Still has the old message about watermarks.


---


[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig

2017-09-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176592#comment-16176592
 ] 

Aljoscha Krettek commented on FLINK-7669:
-

And the fat-jar is built against 1.3.2 always or do you build it against 
1.4-SNAPSHOT when running on that cluster?

> org.apache.flink.api.common.ExecutionConfig cannot be cast to 
> org.apache.flink.api.common.ExecutionConfig
> -
>
> Key: FLINK-7669
> URL: https://issues.apache.org/jira/browse/FLINK-7669
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
> Environment: - OS: macOS Sierra 
> - Oracle JDK 1.8
> - Scala 2.11.11
> - sbt 0.13.16
> - Build from trunk code at commit hash 
> {{42cc3a2a9c41dda7cf338db36b45131db9150674}}
> -- started a local flink node 
>Reporter: Raymond Tay
>
> Latest code pulled from trunk threw errors at runtime when i ran a job 
> against it; but when i ran the JAR against the stable version {{1.3.2}} it 
> was OK. Here is the stacktrace. 
> An exception is being thrown :
> {noformat}
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of 
> programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22#
> Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader 
> session id ----.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 
> (Flink Streaming Job)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
>   at 
> org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53)
>   at 
> org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
> submit job 05dd8e60c6fda3b96fc22ef6cf389a23 (Flink Streaming Job)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1358)
>   at 
> 

[jira] [Commented] (FLINK-7670) typo in docs runtime section

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176563#comment-16176563
 ] 

ASF GitHub Bot commented on FLINK-7670:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4706
  
+1


> typo in docs runtime section
> 
>
> Key: FLINK-7670
> URL: https://issues.apache.org/jira/browse/FLINK-7670
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Kewei SHANG
>Priority: Minor
> Fix For: 1.3.3
>
>
> The following link to Savepoints page
> [Savepoints](..//setup/savepoints.html)
> change to
> [Savepoints](../setup/savepoints.html)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4706: [FLINK-7670][doc] fix typo in docs runtime section

2017-09-22 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4706
  
+1


---


[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig

2017-09-22 Thread Raymond Tay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176560#comment-16176560
 ] 

Raymond Tay commented on FLINK-7669:


Basically, the program is built using {{sbt assembly}} which produces a 
_fatjar_ and i have 2 installations running locally (version {{1.3.2}} which i 
d/l from the website and locally built {{1.4.0-SNAPSHOT}}. The execution 
command is like this:

{{/bin/flink run -p 4 JARFILE}}

Another piece of information i wanted to share is this: i built the 
{{1.4.0-SNAPSHOT}} locally 2 days back and it was alright and this failure only 
happened when i pulled again today

> org.apache.flink.api.common.ExecutionConfig cannot be cast to 
> org.apache.flink.api.common.ExecutionConfig
> -
>
> Key: FLINK-7669
> URL: https://issues.apache.org/jira/browse/FLINK-7669
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
> Environment: - OS: macOS Sierra 
> - Oracle JDK 1.8
> - Scala 2.11.11
> - sbt 0.13.16
> - Build from trunk code at commit hash 
> {{42cc3a2a9c41dda7cf338db36b45131db9150674}}
> -- started a local flink node 
>Reporter: Raymond Tay
>
> Latest code pulled from trunk threw errors at runtime when i ran a job 
> against it; but when i ran the JAR against the stable version {{1.3.2}} it 
> was OK. Here is the stacktrace. 
> An exception is being thrown :
> {noformat}
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of 
> programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22#
> Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader 
> session id ----.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 
> (Flink Streaming Job)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
>   at 
> org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53)
>   at 
> org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
> Caused by: 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176559#comment-16176559
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4696
  
Nice improvement to the docs, had a few comments.


> Better documentation and examples on C* sink usage for Pojo and Tuples data 
> types
> -
>
> Key: FLINK-7632
> URL: https://issues.apache.org/jira/browse/FLINK-7632
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cassandra Connector, Documentation
>Reporter: Michael Fong
>Assignee: Michael Fong
>
> Cassandra sink supports Pojo and Java Tuple data types. We should improve 
> documentation on its usage as well as some concrete / meaningful examples for 
> both cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4696: [FLINK-7632] [document] Overhaul on Cassandra connector d...

2017-09-22 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4696
  
Nice improvement to the docs, had a few comments.


---


[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176556#comment-16176556
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517696
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176555#comment-16176555
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517986
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140518123
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517823
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176553#comment-16176553
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140516418
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
--- End diff --

imo this is a bit redundant, we are already linking to the checkpoint docs 
after all.


> Better documentation and examples on C* sink usage for Pojo and Tuples data 
> types
> -
>
> Key: FLINK-7632
> URL: https://issues.apache.org/jira/browse/FLINK-7632
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cassandra Connector, Documentation
>Reporter: Michael Fong
>Assignee: Michael Fong
>
> Cassandra sink supports Pojo and Java Tuple data types. We should improve 
> documentation on its usage as well as some concrete / meaningful examples for 
> both cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517182
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
--- End diff --

I would leave this out. Its one of those things that are easily out-dated, 
and don't provide immediate value when reading the docs for the first time. If 
a user stumbles upon this the error message should be self-explanatory; if it 
isn't we should change it accordingly.


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140516418
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
--- End diff --

imo this is a bit redundant, we are already linking to the checkpoint docs 
after all.


---


[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176558#comment-16176558
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517823
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176554#comment-16176554
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517182
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
--- End diff --

I would leave this out. Its one of those things that are easily out-dated, 
and don't provide immediate value when reading the docs for the first time. If 
a user stumbles upon this the error message should be self-explanatory; if it 
isn't we should change it accordingly.


> Better documentation and examples on C* sink usage for Pojo and Tuples data 
> types
> -
>
> Key: FLINK-7632
> URL: https://issues.apache.org/jira/browse/FLINK-7632
>   

[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517696
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176552#comment-16176552
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140516135
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

--- End diff --

theres a double space after mutations


> Better documentation and examples on C* sink usage for Pojo and Tuples data 
> types
> -
>
> Key: FLINK-7632
> URL: https://issues.apache.org/jira/browse/FLINK-7632
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cassandra Connector, Documentation
>Reporter: Michael Fong
>Assignee: Michael Fong
>
> Cassandra sink supports Pojo and Java Tuple data types. We should improve 
> documentation on its usage as well as some concrete / meaningful examples for 
> both cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140516135
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

--- End diff --

theres a double space after mutations


---


[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...

2017-09-22 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140517986
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String value, Collector> out) {
+// normalize and split the line
+String[] words = value.toLowerCase().split("\\W+");
+
+// emit the pairs
+for 

[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176557#comment-16176557
 ] 

ASF GitHub Bot commented on FLINK-7632:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4696#discussion_r140518123
  
--- Diff: docs/dev/connectors/cassandra.md ---
@@ -78,76 +96,195 @@ Note that that enabling this feature will have an 
adverse impact on latency.
 
 Note: 
The write-ahead log functionality is currently experimental. In many cases it 
is sufficent to use the connector without enabling it. Please report problems 
to the development mailing list.
 
+### Checkpointing and Fault Tolerance
+With checkpointing enabled, Cassandra Sink guarantees at-least-once 
delivery of action requests to C* instance.
 
- Example
+Note:However, current Cassandra Sink implementation 
does not flush the pending mutations  before the checkpoint was triggered. 
Thus, some in-flight mutations might not be replayed when the job recovered. 

+
+More details on [checkpoints docs]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ 
site.baseurl }}/dev/connectors/guarantees.html)
+
+To enable fault tolerant guarantee, checkpointing of the topology needs to 
be enabled at the execution environment:
 
 
 
 {% highlight java %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-@Override
-public Cluster buildCluster(Cluster.Builder builder) {
-  return builder.addContactPoint("127.0.0.1").build();
-}
-  })
-  .build();
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
 {% endhighlight %}
 
 
 {% highlight scala %}
-CassandraSink.addSink(input)
-  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
-  .setClusterBuilder(new ClusterBuilder() {
-override def buildCluster(builder: Cluster.Builder): Cluster = {
-  builder.addContactPoint("127.0.0.1").build()
-}
-  })
-  .build()
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+{% endhighlight %}
+
+
+
+## Examples
+
+The Cassandra sinks currently support both Java Tuple and POJO data types, 
and Flink automatically detects which type of input is used. For general use 
case of those streaming data type, please refer to [Supported Data Types]({{ 
site.baseurl }}/dev/api_concepts.html). We show two implementations based on 
[SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java),
 for Pojo and Java Tuple data types respectively.
+
+In all these examples, we assumed the associated Keyspace `example` and 
Table `wordcount` have been created.
+
+
+
+{% highlight sql %}
+CREATE KEYSPACE IF NOT EXISTS example
+WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 
'1'};
+CREATE TABLE IF NOT EXISTS example.wordcount (
+word text,
+count bigint,
+PRIMARY KEY(word)
+);
+{% endhighlight %}
+
+
+
+### Cassandra Sink Example for Streaming Java Tuple Data Type
+While storing the result with Java Tuple data type to a Cassandra sink, it 
is required to set a CQL upsert statement (via setQuery('stmt')) to persist 
each record back to database. With the upsert query cached as 
`PreparedStatement`, Cassandra connector internally converts each Tuple 
elements as parameters to the statement.
+
+For details about `PreparedStatement` and `BoundStatement`, please visit 
[DataStax Java Driver 
manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/)
+
+Please note that if the upsert query were not set, an 
`IllegalArgumentException` would be thrown with the following error message 
`Query must not be null or empty.`
+
+
+
+{% highlight java %}
+// get the execution environment
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// get input data by connecting to the socket
+DataStream text = env.socketTextStream(hostname, port, "\n");
+
+// parse the data, group it, window it, and aggregate the counts
+DataStream> result = text
+
+.flatMap(new FlatMapFunction>() {
+@Override
+public void flatMap(String 

[jira] [Commented] (FLINK-7524) Task "xxx" did not react to cancelling signal, but is stuck in method

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176539#comment-16176539
 ] 

ASF GitHub Bot commented on FLINK-7524:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4639#discussion_r140516796
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java 
---
@@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws 
IOException {
}
 
synchronized (getSynchronizationLock()) {
-   if (closed) {
-   IOUtils.closeQuietly(closeable);
-   throw new IOException("Cannot register 
Closeable, registry is already closed. Closing argument.");
+   if (!closed) {
+   doRegister(closeable, closeableToRef);
+   return;
}
-
-   doRegister(closeable, closeableToRef);
}
+
+   IOUtils.closeQuietly(closeable);
--- End diff --

Makes sense!  



> Task "xxx" did not react to cancelling signal, but is stuck in method
> -
>
> Key: FLINK-7524
> URL: https://issues.apache.org/jira/browse/FLINK-7524
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Hi,
> I observed the following errors in taskmanager.log 
> {code:java}
> 2017-08-25 17:03:40,141 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'TriggerWindow(SlidingEventTimeWindows(25920, 
> 360), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in 
> method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> java.lang.Thread.run(Thread.java:748)
> ...
> 2017-08-25 17:05:10,139 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Notifying TaskManager about fatal error. Task 
> 'TriggerWindow(SlidingEventTimeWindows(25920, 360), 
> AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2,
>  aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, 
> EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> 
> Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 
> seconds, but is stuck in method:
>  
> org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137)
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147)
> 

[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...

2017-09-22 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4639#discussion_r140516796
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java 
---
@@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws 
IOException {
}
 
synchronized (getSynchronizationLock()) {
-   if (closed) {
-   IOUtils.closeQuietly(closeable);
-   throw new IOException("Cannot register 
Closeable, registry is already closed. Closing argument.");
+   if (!closed) {
+   doRegister(closeable, closeableToRef);
+   return;
}
-
-   doRegister(closeable, closeableToRef);
}
+
+   IOUtils.closeQuietly(closeable);
--- End diff --

Makes sense! 👌 



---


[jira] [Commented] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176537#comment-16176537
 ] 

ASF GitHub Bot commented on FLINK-7656:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4690#discussion_r140515705
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
 ---
@@ -84,10 +88,10 @@ public void testConnectMultipleTargets() {
@Test
public void testOutputFormatVertex() {
try {
-   final TestingOutputFormat outputFormat = new 
TestingOutputFormat();
+   final OutputFormat outputFormat = new 
TestingOutputFormat();
final OutputFormatVertex of = new 
OutputFormatVertex("Name");
new 
TaskConfig(of.getConfiguration()).setStubWrapper(new 
UserCodeObjectWrapper(outputFormat));
-   final ClassLoader cl = getClass().getClassLoader();
+   final ClassLoader cl = new FlinkUserCodeClassLoader(new 
URL[0], getClass().getClassLoader());
--- End diff --

This doesn't exist anymore but you can use 
`FlinkUserCodeClassLoaders.childFirst()`/`parentFirst()`.


> Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
> 
>
> Key: FLINK-7656
> URL: https://issues.apache.org/jira/browse/FLINK-7656
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> The contract that Flink provides to usercode is that that the usercode 
> classloader is the context classloader whenever usercode is called.
> In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} 
> and {{finalizeOnMaster()}} methods but the context classloader is not set to 
> the usercode classloader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4690: [FLINK-7656] [runtime] Switch to user classloader ...

2017-09-22 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4690#discussion_r140515705
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
 ---
@@ -84,10 +88,10 @@ public void testConnectMultipleTargets() {
@Test
public void testOutputFormatVertex() {
try {
-   final TestingOutputFormat outputFormat = new 
TestingOutputFormat();
+   final OutputFormat outputFormat = new 
TestingOutputFormat();
final OutputFormatVertex of = new 
OutputFormatVertex("Name");
new 
TaskConfig(of.getConfiguration()).setStubWrapper(new 
UserCodeObjectWrapper(outputFormat));
-   final ClassLoader cl = getClass().getClassLoader();
+   final ClassLoader cl = new FlinkUserCodeClassLoader(new 
URL[0], getClass().getClassLoader());
--- End diff --

This doesn't exist anymore but you can use 
`FlinkUserCodeClassLoaders.childFirst()`/`parentFirst()`.


---


[jira] [Assigned] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests

2017-09-22 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-6444:
---

Assignee: mingleizhang  (was: Hai Zhou_UTC+8)

> Add a check that '@VisibleForTesting' methods are only used in tests
> 
>
> Key: FLINK-6444
> URL: https://issues.apache.org/jira/browse/FLINK-6444
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>
> Some methods are annotated with {{@VisibleForTesting}}. These methods should 
> only be called from tests.
> This is currently not enforced / checked during the build. We should add such 
> a check.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7670) typo in docs runtime section

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176450#comment-16176450
 ] 

ASF GitHub Bot commented on FLINK-7670:
---

GitHub user keweishang opened a pull request:

https://github.com/apache/flink/pull/4706

[FLINK-7670][doc] fix typo in docs runtime section

The following link to Savepoints page
[Savepoints](..//setup/savepoints.html)
change to
[Savepoints](../setup/savepoints.html)

Otherwise, the link wouldn't work.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/keweishang/flink FLINK-7670

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4706.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4706


commit 098740fceeaed11f0072c9cbf612783b4829e98b
Author: Kewei Shang 
Date:   2017-09-22T13:40:41Z

[FLINK-7670][doc] fix typo in docs runtime section




> typo in docs runtime section
> 
>
> Key: FLINK-7670
> URL: https://issues.apache.org/jira/browse/FLINK-7670
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Kewei SHANG
>Priority: Minor
> Fix For: 1.3.3
>
>
> The following link to Savepoints page
> [Savepoints](..//setup/savepoints.html)
> change to
> [Savepoints](../setup/savepoints.html)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4706: [FLINK-7670][doc] fix typo in docs runtime section

2017-09-22 Thread keweishang
GitHub user keweishang opened a pull request:

https://github.com/apache/flink/pull/4706

[FLINK-7670][doc] fix typo in docs runtime section

The following link to Savepoints page
[Savepoints](..//setup/savepoints.html)
change to
[Savepoints](../setup/savepoints.html)

Otherwise, the link wouldn't work.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/keweishang/flink FLINK-7670

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4706.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4706


commit 098740fceeaed11f0072c9cbf612783b4829e98b
Author: Kewei Shang 
Date:   2017-09-22T13:40:41Z

[FLINK-7670][doc] fix typo in docs runtime section




---


[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176444#comment-16176444
 ] 

ASF GitHub Bot commented on FLINK-6444:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4705

[FLINK-6444] [build] Add a check that '@VisibleForTesting' methods ar…

## What is the purpose of the change

## Brief change log


## Verifying this change



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6444

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4705.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4705


commit 425c6eaee2d76df2df4f5d2070a33b1b4b9ddbcb
Author: zhangminglei 
Date:   2017-09-22T13:44:18Z

[FLINK-6444] [build] Add a check that '@VisibleForTesting' methods are only 
used in tests




> Add a check that '@VisibleForTesting' methods are only used in tests
> 
>
> Key: FLINK-6444
> URL: https://issues.apache.org/jira/browse/FLINK-6444
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Hai Zhou_UTC+8
>
> Some methods are annotated with {{@VisibleForTesting}}. These methods should 
> only be called from tests.
> This is currently not enforced / checked during the build. We should add such 
> a check.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4705: [FLINK-6444] [build] Add a check that '@VisibleFor...

2017-09-22 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/4705

[FLINK-6444] [build] Add a check that '@VisibleForTesting' methods ar…

## What is the purpose of the change

## Brief change log


## Verifying this change



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-6444

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4705.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4705


commit 425c6eaee2d76df2df4f5d2070a33b1b4b9ddbcb
Author: zhangminglei 
Date:   2017-09-22T13:44:18Z

[FLINK-6444] [build] Add a check that '@VisibleForTesting' methods are only 
used in tests




---


[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig

2017-09-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176428#comment-16176428
 ] 

Aljoscha Krettek commented on FLINK-7669:
-

How are your running your program?

> org.apache.flink.api.common.ExecutionConfig cannot be cast to 
> org.apache.flink.api.common.ExecutionConfig
> -
>
> Key: FLINK-7669
> URL: https://issues.apache.org/jira/browse/FLINK-7669
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
> Environment: - OS: macOS Sierra 
> - Oracle JDK 1.8
> - Scala 2.11.11
> - sbt 0.13.16
> - Build from trunk code at commit hash 
> {{42cc3a2a9c41dda7cf338db36b45131db9150674}}
> -- started a local flink node 
>Reporter: Raymond Tay
>
> Latest code pulled from trunk threw errors at runtime when i ran a job 
> against it; but when i ran the JAR against the stable version {{1.3.2}} it 
> was OK. Here is the stacktrace. 
> An exception is being thrown :
> {noformat}
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of 
> programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22#
> Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader 
> session id ----.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 
> (Flink Streaming Job)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
>   at 
> org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53)
>   at 
> org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
> submit job 05dd8e60c6fda3b96fc22ef6cf389a23 (Flink Streaming Job)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1358)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:484)
>   at 
> 

[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests

2017-09-22 Thread Hai Zhou_UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176416#comment-16176416
 ] 

Hai Zhou_UTC+8 commented on FLINK-6444:
---

I am trying the first.
Implement a "Spotbugs" Detector, and configure a "Spotbugs" rule.

> Add a check that '@VisibleForTesting' methods are only used in tests
> 
>
> Key: FLINK-6444
> URL: https://issues.apache.org/jira/browse/FLINK-6444
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Hai Zhou_UTC+8
>
> Some methods are annotated with {{@VisibleForTesting}}. These methods should 
> only be called from tests.
> This is currently not enforced / checked during the build. We should add such 
> a check.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7670) typo in docs runtime section

2017-09-22 Thread Kewei SHANG (JIRA)
Kewei SHANG created FLINK-7670:
--

 Summary: typo in docs runtime section
 Key: FLINK-7670
 URL: https://issues.apache.org/jira/browse/FLINK-7670
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.3.2
Reporter: Kewei SHANG
Priority: Minor
 Fix For: 1.3.3


The following link to Savepoints page
[Savepoints](..//setup/savepoints.html)
change to
[Savepoints](../setup/savepoints.html)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7537) Add InfluxDB Sink for Flink Streaming

2017-09-22 Thread Hai Zhou_UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou_UTC+8 closed FLINK-7537.
-
Resolution: Fixed

> Add InfluxDB Sink for Flink Streaming
> -
>
> Key: FLINK-7537
> URL: https://issues.apache.org/jira/browse/FLINK-7537
> Project: Flink
>  Issue Type: Wish
>  Components: Streaming Connectors
>Affects Versions: 1.3.0
>Reporter: Hai Zhou_UTC+8
>Assignee: Hai Zhou_UTC+8
> Fix For: 1.3.0
>
>
> InfluxDBSink via implementation RichSinkFunction.
> [BAHIR-134|https://issues.apache.org/jira/browse/BAHIR-134]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7600) shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to avoid updateCredentials Exception

2017-09-22 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-7600.

Resolution: Fixed

Thanks for the contribution!

Fixed for {{master}} via 6c1a946562ec8ac4825b871aefdec040cc02aaf2.
Fixed for {{1.3.3}} via cc0de8486b2c83bf03da0f8115df4b5ff72f6ed6.

> shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to 
> avoid updateCredentials Exception
> ---
>
> Key: FLINK-7600
> URL: https://issues.apache.org/jira/browse/FLINK-7600
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> we saw the following warning in Flink log:
> {code:java}
> 2017-08-11 02:33:24,473 WARN  
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon
>   - Exception during updateCredentials
> java.lang.InterruptedException: sleep interrupted
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$5.run(Daemon.java:316)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> According to discussion in 
> https://github.com/awslabs/amazon-kinesis-producer/issues/10, setting the 
> delay to 100 will fix this issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7600) shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to avoid updateCredentials Exception

2017-09-22 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-7600.
--

> shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to 
> avoid updateCredentials Exception
> ---
>
> Key: FLINK-7600
> URL: https://issues.apache.org/jira/browse/FLINK-7600
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> we saw the following warning in Flink log:
> {code:java}
> 2017-08-11 02:33:24,473 WARN  
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon
>   - Exception during updateCredentials
> java.lang.InterruptedException: sleep interrupted
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$5.run(Daemon.java:316)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> According to discussion in 
> https://github.com/awslabs/amazon-kinesis-producer/issues/10, setting the 
> delay to 100 will fix this issue



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-22 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-7508.
--

> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 and FLINK-7508
> Benchmarking I did:
> * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
> cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink 
> job generates about 21million UserRecords, which means that we generated a 
> test load of 21million UserRecords at the first minute of each hour.
> * Criteria: Test KPL throughput per minute. Since the default RecordTTL for 
> KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL 
> within a minute, or we will see UserRecord expiration errors.
> * One-New-Thread-Per-Request model: max throughput is about 2million 
> UserRecords per min; it doesn't go beyond that because CPU utilization goes 
> to 100%, everything stopped working and that Flink job crashed.
> * Thread-Pool model with pool size of 10: it sends out 21million UserRecords 
> within 30 sec without any UserRecord expiration errors. The average peak CPU 
> utilization is about 20% - 30%. So 21million UserRecords/min is not the max 
> throughput of thread-pool model. We didn't go any further because 1) this 
> throughput is already a couple times more than what we really need, and 2) we 
> don't have a quick way of increasing the test load
> Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. 
> [~tzulitai] What do you think



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode

2017-09-22 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-7508.

Resolution: Fixed

Thanks for the contribution [~phoenixjiangnan].

Resolved for {{master}} via 637dde889fe2d21ff6990749a750356d20fcd965.

> switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode 
> rather than Per_Request mode
> 
>
> Key: FLINK-7508
> URL: https://issues.apache.org/jira/browse/FLINK-7508
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.4.0
>
>
> KinesisProducerLibrary (KPL) 0.10.x had been using a 
> One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which 
> is very expensive.
> 0.12.4 introduced a new [ThreadingMode - 
> Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225],
>  which will use a thread pool. This hugely improves KPL's performance and 
> reduces consumed resources. By default, KPL still uses per-request mode. We 
> should explicitly switch FlinkKinesisProducer's KPL threading mode to 
> 'Pooled'.
> This work depends on FLINK-7366 and FLINK-7508
> Benchmarking I did:
> * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR 
> cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink 
> job generates about 21million UserRecords, which means that we generated a 
> test load of 21million UserRecords at the first minute of each hour.
> * Criteria: Test KPL throughput per minute. Since the default RecordTTL for 
> KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL 
> within a minute, or we will see UserRecord expiration errors.
> * One-New-Thread-Per-Request model: max throughput is about 2million 
> UserRecords per min; it doesn't go beyond that because CPU utilization goes 
> to 100%, everything stopped working and that Flink job crashed.
> * Thread-Pool model with pool size of 10: it sends out 21million UserRecords 
> within 30 sec without any UserRecord expiration errors. The average peak CPU 
> utilization is about 20% - 30%. So 21million UserRecords/min is not the max 
> throughput of thread-pool model. We didn't go any further because 1) this 
> throughput is already a couple times more than what we really need, and 2) we 
> don't have a quick way of increasing the test load
> Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. 
> [~tzulitai] What do you think



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176316#comment-16176316
 ] 

ASF GitHub Bot commented on FLINK-7656:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4690
  
Can you take a brief look at this change @aljoscha? Thanks!


> Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
> 
>
> Key: FLINK-7656
> URL: https://issues.apache.org/jira/browse/FLINK-7656
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.3.2
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> The contract that Flink provides to usercode is that that the usercode 
> classloader is the context classloader whenever usercode is called.
> In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} 
> and {{finalizeOnMaster()}} methods but the context classloader is not set to 
> the usercode classloader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4690: [FLINK-7656] [runtime] Switch to user classloader before ...

2017-09-22 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4690
  
Can you take a brief look at this change @aljoscha? Thanks!


---


[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176315#comment-16176315
 ] 

ASF GitHub Bot commented on FLINK-7636:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140474944
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
+
+  /**
+* Creates a copy of this Flink RelOptTable with new Flink Table and 
new row type.
+*
+* @param newTablenew flink table
+* @param typeFactory type factory to create new row type of new flink 
table
+* @return The copy of this Flink RelOptTable with new Flink table and 
new row type
+*/
+  def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): 
FlinkRelOptTable = {
+val newRowType = newTable.getRowType(typeFactory)
+FlinkRelOptTable.create(schema, newRowType, names, newTable)
+  }
+
+  /**
+* Extends a table with the given extra fields, which is not supported 
now.
+*
+* @param extendedFields
--- End diff --

I meant to either remove the `@param extendedFields` or add a comment.


> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after 

[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140474944
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
+
+  /**
+* Creates a copy of this Flink RelOptTable with new Flink Table and 
new row type.
+*
+* @param newTablenew flink table
+* @param typeFactory type factory to create new row type of new flink 
table
+* @return The copy of this Flink RelOptTable with new Flink table and 
new row type
+*/
+  def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): 
FlinkRelOptTable = {
+val newRowType = newTable.getRowType(typeFactory)
+FlinkRelOptTable.create(schema, newRowType, names, newTable)
+  }
+
+  /**
+* Extends a table with the given extra fields, which is not supported 
now.
+*
+* @param extendedFields
--- End diff --

I meant to either remove the `@param extendedFields` or add a comment.


---


[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176254#comment-16176254
 ] 

ASF GitHub Bot commented on FLINK-7636:
---

Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/4681
  
@fhueske Thanks for your advice. I already update the pr based on your 
suggestion.


> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after RelNode tree is built.
> 2. now all TableSourceScan contains a tablesource as constructor parameter, 
> so we could fetch the tablesource directly later.
>  
> The result tableSource is different with each other by above two ways after 
> apply project push(PPD) down or filter push down(FPD).  It is very confusing. 
> we hope to fix the problem by introducing FlinkRelOptTable to replace 
> RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
> constructor. After PPD or FPD,  a new FlinkRelOptTable instance which 
> contains a new TableSourceTable will be passed to TableSourceScan 
> constructor. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOptTable,...

2017-09-22 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/4681
  
@fhueske Thanks for your advice. I already update the pr based on your 
suggestion.


---


[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176252#comment-16176252
 ] 

ASF GitHub Bot commented on FLINK-7636:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140462913
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
+
+  /**
+* Creates a copy of this Flink RelOptTable with new Flink Table and 
new row type.
+*
+* @param newTablenew flink table
+* @param typeFactory type factory to create new row type of new flink 
table
+* @return The copy of this Flink RelOptTable with new Flink table and 
new row type
+*/
+  def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): 
FlinkRelOptTable = {
+val newRowType = newTable.getRowType(typeFactory)
+FlinkRelOptTable.create(schema, newRowType, names, newTable)
+  }
+
+  /**
+* Extends a table with the given extra fields, which is not supported 
now.
+*
+* @param extendedFields
--- End diff --

Do you mean add comment for extendedFields?


> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after RelNode tree is built.
> 

[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-22 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140462913
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
+
+  /**
+* Creates a copy of this Flink RelOptTable with new Flink Table and 
new row type.
+*
+* @param newTablenew flink table
+* @param typeFactory type factory to create new row type of new flink 
table
+* @return The copy of this Flink RelOptTable with new Flink table and 
new row type
+*/
+  def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): 
FlinkRelOptTable = {
+val newRowType = newTable.getRowType(typeFactory)
+FlinkRelOptTable.create(schema, newRowType, names, newTable)
+  }
+
+  /**
+* Extends a table with the given extra fields, which is not supported 
now.
+*
+* @param extendedFields
--- End diff --

Do you mean add comment for extendedFields?


---


[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176251#comment-16176251
 ] 

ASF GitHub Bot commented on FLINK-7636:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140462814
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
--- End diff --

done


> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after RelNode tree is built.
> 2. now all TableSourceScan contains a tablesource as constructor parameter, 
> so we could fetch the tablesource directly later.
>  
> The result tableSource is different with each other by above two ways after 
> apply project push(PPD) down or filter push down(FPD).  It is very confusing. 
> we hope to fix the problem by introducing FlinkRelOptTable to replace 
> RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
> constructor. After PPD or FPD,  a new FlinkRelOptTable instance which 
> contains a new TableSourceTable will be passed to TableSourceScan 
> constructor. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-22 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140462814
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
+table: FlinkTable[_]) extends PreparingTable {
--- End diff --

done


---


[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...

2017-09-22 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140462752
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
--- End diff --

done


---


[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor

2017-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176248#comment-16176248
 ] 

ASF GitHub Bot commented on FLINK-7636:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4681#discussion_r140462752
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.prepare.CalcitePrepareImpl
+import java.util.{List => JList}
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.linq4j.tree.Expression
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.prepare.Prepare.PreparingTable
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory}
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.calcite.runtime.Hook
+import org.apache.calcite.schema.{StreamableTable, TranslatableTable}
+import org.apache.calcite.sql.SqlAccessType
+import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity}
+import org.apache.calcite.sql2rel.InitializerContext
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.plan.stats.FlinkStatistic
+
+import scala.collection.JavaConverters._
+
+/**
+  * FlinkRelOptTable wraps a FlinkTable
+  *
+  * @param schema  the [[RelOptSchema]] this table belongs to
+  * @param rowType the type of rows returned by this table
+  * @param names   the identifier for this table. The identifier must be 
unique with
+  *respect to the Connection producing this table.
+  * @param table   wrapped flink table
+  */
+
+class FlinkRelOptTable protected(
+schema: RelOptSchema,
+rowType: RelDataType,
+names: JList[String],
--- End diff --

done


> Introduce Flink RelOptTable,  and remove tableSource from all TableSourceScan 
> node constructor
> --
>
> Key: FLINK-7636
> URL: https://issues.apache.org/jira/browse/FLINK-7636
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: jingzhang
>Assignee: jingzhang
>
> At present, there are two ways to fetch TableSource of a TableSourceScan node 
> (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...):
> 1. 
> {code}
> val relOptTable: RelOptTable = getTable()
> val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]])
> val tableSouce = tableSourceTable.tableSource
> {code}
> the result of getTable() is instance of RelOptTableImpl now, and it will not 
> change after RelNode tree is built.
> 2. now all TableSourceScan contains a tablesource as constructor parameter, 
> so we could fetch the tablesource directly later.
>  
> The result tableSource is different with each other by above two ways after 
> apply project push(PPD) down or filter push down(FPD).  It is very confusing. 
> we hope to fix the problem by introducing FlinkRelOptTable to replace 
> RelOptTableImpl, and remove tableSource parameter from TableSourceScan's 
> constructor. After PPD or FPD,  a new FlinkRelOptTable instance which 
> contains a new TableSourceTable will be passed to TableSourceScan 
> constructor. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >