[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig
[ https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177433#comment-16177433 ] Raymond Tay commented on FLINK-7669: The fatjar was initially built against 1.4-SNAPSHOT and when it gave those error messages, i d/l 1.3.2 and used the same fatjar to run against 1.3.2 and it was alright. I suspect some commit in the last 2 days caused it. > org.apache.flink.api.common.ExecutionConfig cannot be cast to > org.apache.flink.api.common.ExecutionConfig > - > > Key: FLINK-7669 > URL: https://issues.apache.org/jira/browse/FLINK-7669 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.4.0 > Environment: - OS: macOS Sierra > - Oracle JDK 1.8 > - Scala 2.11.11 > - sbt 0.13.16 > - Build from trunk code at commit hash > {{42cc3a2a9c41dda7cf338db36b45131db9150674}} > -- started a local flink node >Reporter: Raymond Tay > > Latest code pulled from trunk threw errors at runtime when i ran a job > against it; but when i ran the JAR against the stable version {{1.3.2}} it > was OK. Here is the stacktrace. > An exception is being thrown : > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of > programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22# > Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job > completion. > Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader > session id ----. > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 > (Flink Streaming Job) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629) > at > org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53) > at > org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to > submit job 05dd8e60c6fda3b96fc22ef6cf389a23 (Flink Streaming Job) > at >
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177387#comment-16177387 ] ASF GitHub Bot commented on FLINK-7632: --- Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620313 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177386#comment-16177386 ] ASF GitHub Bot commented on FLINK-7632: --- Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620296 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620313 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String value, Collector > out) { +// normalize and split the line +String[] words = value.toLowerCase().split("\\W+"); + +// emit the pairs +for
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177385#comment-16177385 ] ASF GitHub Bot commented on FLINK-7632: --- Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620261 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: --- End diff -- Very true. Thanks. > Better documentation and examples on C* sink usage for Pojo and Tuples data > types > - > > Key: FLINK-7632 > URL: https://issues.apache.org/jira/browse/FLINK-7632 > Project: Flink > Issue Type: Sub-task > Components: Cassandra Connector, Documentation >Reporter: Michael Fong >Assignee: Michael Fong > > Cassandra sink supports Pojo and Java Tuple data types. We should improve > documentation on its usage as well as some concrete / meaningful examples for > both cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620261 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: --- End diff -- Very true. Thanks. ---
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177384#comment-16177384 ] ASF GitHub Bot commented on FLINK-7632: --- Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620229 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. --- End diff -- Good eye. Thanks > Better documentation and examples on C* sink usage for Pojo and Tuples data > types > - > > Key: FLINK-7632 > URL: https://issues.apache.org/jira/browse/FLINK-7632 > Project: Flink > Issue Type: Sub-task > Components: Cassandra Connector, Documentation >Reporter: Michael Fong >Assignee: Michael Fong > > Cassandra sink supports Pojo and Java Tuple data types. We should improve > documentation on its usage as well as some concrete / meaningful examples for > both cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620296 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String value, Collector > out) { +// normalize and split the line +String[] words = value.toLowerCase().split("\\W+"); + +// emit the pairs +for
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177383#comment-16177383 ] ASF GitHub Bot commented on FLINK-7632: --- Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620211 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620211 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String value, Collector > out) { +// normalize and split the line +String[] words = value.toLowerCase().split("\\W+"); + +// emit the pairs +for
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user mcfongtw commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140620229 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. --- End diff -- Good eye. Thanks ---
[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function
[ https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177372#comment-16177372 ] ASF GitHub Bot commented on FLINK-7358: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4534 @fhueske Because the 11.11, I have not update this PR in time. sorry about that. Now, I have updated the PR. and PR description. Please look at the PR. :) Thanks, jincheng > Add implicitly converts support for User-defined function > -- > > Key: FLINK-7358 > URL: https://issues.apache.org/jira/browse/FLINK-7358 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently if user defined a UDF as follows: > {code} > object Func extends ScalarFunction { > def eval(a: Int, b: Long): String = { > ... > } > } > {code} > And if the table schema is (a: Int, b: int, c: String), then we can not call > the UDF `Func('a, 'b)`. So > I want add implicitly converts when we call UDF. The implicitly convert rule > is: > BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> > FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO > *Note: > In this JIRA. only for TableAPI, And SQL will be fixed in > https://issues.apache.org/jira/browse/CALCITE-1908.* > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4534: [FLINK-7358][table]Add implicitly converts support for Us...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4534 @fhueske Because the 11.11, I have not update this PR in time. sorry about that. Now, I have updated the PR. and PR description. Please look at the PR. :) Thanks, jincheng ---
[GitHub] flink issue #4701: [FLINK-7664] Replace FlinkFutureException by java.util.co...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4701 LGTM ---
[jira] [Commented] (FLINK-7664) Replace FlinkFutureException by CompletionException
[ https://issues.apache.org/jira/browse/FLINK-7664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177319#comment-16177319 ] ASF GitHub Bot commented on FLINK-7664: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4701 LGTM > Replace FlinkFutureException by CompletionException > --- > > Key: FLINK-7664 > URL: https://issues.apache.org/jira/browse/FLINK-7664 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{FlinkFutureException}} was introduced to fail in a > {{CompletableFuture}} callback. This can, however, better be done via the > {{CompletionException}}. Therefore, we should remove the > {{FlinkFutureException}} and replace it with the {{CompletionException}} > instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7673) Flink Kinesis connector - remove dependency on the ASL code
[ https://issues.apache.org/jira/browse/FLINK-7673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Radoslaw Gruchalski updated FLINK-7673: --- Description: Currently the Flink Kinesis connector depends on two artifacts which are available under Amazon Software License code. The artifacts are: {noformat} com.amazonaws amazon-kinesis-producer ${aws.kinesis-kpl.version} com.amazonaws amazon-kinesis-client ${aws.kinesis-kcl.version} com.amazonaws aws-java-sdk-dynamodb com.amazonaws aws-java-sdk-cloudwatch {noformat} This prevents the connector being published to Maven Central. I would like to contribute to Flink Kinesis connector by replacing the ASL licensed code with an implementation using AWS SDK for Java. For that, I'd like to understand two things: - what's the correct process for making the contribution (contributor agreement, etc.) - what's the minimum functionality the alternative implementation would have to provide in order to get accepted There are three classes in the connector's source requiring modification: - org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer - org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil - org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer Thank you. was: Currently the Flink Kinesis connector depends on two artifacts which are available under Amazon Software License code. The artifacts are: {noformat} com.amazonaws amazon-kinesis-producer ${aws.kinesis-kpl.version} com.amazonaws amazon-kinesis-client ${aws.kinesis-kcl.version} com.amazonaws aws-java-sdk-dynamodb com.amazonaws aws-java-sdk-cloudwatch {noformat} This prevents the connector being published to Maven Central. I would like to contribute to Flink Kinesis connector by replacing the ASL licensed code with an implementation using AWS SDK for Java. For that, I'd like to understand two things: - what's the correct process for making the contribution (contributor agreement, etc.) - what's the minimum functionality the alternative implementation would have to provide in order to get accepted Thank you. > Flink Kinesis connector - remove dependency on the ASL code > --- > > Key: FLINK-7673 > URL: https://issues.apache.org/jira/browse/FLINK-7673 > Project: Flink > Issue Type: Improvement >Reporter: Radoslaw Gruchalski > > Currently the Flink Kinesis connector depends on two artifacts which are > available under Amazon Software License code. The artifacts are: > {noformat} > >com.amazonaws >amazon-kinesis-producer >${aws.kinesis-kpl.version} > > >com.amazonaws >amazon-kinesis-client >${aws.kinesis-kcl.version} > > > >com.amazonaws > > aws-java-sdk-dynamodb > > >com.amazonaws > > aws-java-sdk-cloudwatch > > > > {noformat} > This prevents the connector being published to Maven Central. I would like to > contribute to Flink Kinesis connector by replacing the ASL licensed code with > an implementation using AWS SDK for Java. For that, I'd like to understand > two things: > - what's the correct process for making the contribution (contributor > agreement, etc.) > - what's the minimum functionality
[jira] [Commented] (FLINK-7664) Replace FlinkFutureException by CompletionException
[ https://issues.apache.org/jira/browse/FLINK-7664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177318#comment-16177318 ] ASF GitHub Bot commented on FLINK-7664: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4701#discussion_r140614369 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java --- @@ -65,7 +65,7 @@ public JarPlanHandler(Executor executor, File jarDirectory) { return writer.toString(); } catch (Exception e) { - throw new FlinkFutureException(e); + throw new CompletionException(e); --- End diff -- Why it's not wrapping a `FlinkException` here? > Replace FlinkFutureException by CompletionException > --- > > Key: FLINK-7664 > URL: https://issues.apache.org/jira/browse/FLINK-7664 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{FlinkFutureException}} was introduced to fail in a > {{CompletableFuture}} callback. This can, however, better be done via the > {{CompletionException}}. Therefore, we should remove the > {{FlinkFutureException}} and replace it with the {{CompletionException}} > instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4701: [FLINK-7664] Replace FlinkFutureException by java....
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4701#discussion_r140614369 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java --- @@ -65,7 +65,7 @@ public JarPlanHandler(Executor executor, File jarDirectory) { return writer.toString(); } catch (Exception e) { - throw new FlinkFutureException(e); + throw new CompletionException(e); --- End diff -- Why it's not wrapping a `FlinkException` here? ---
[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?
[ https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177315#comment-16177315 ] Radoslaw Gruchalski commented on FLINK-7672: https://issues.apache.org/jira/browse/FLINK-7673 > Publish Kinesis connector to Maven Central? > --- > > Key: FLINK-7672 > URL: https://issues.apache.org/jira/browse/FLINK-7672 > Project: Flink > Issue Type: Improvement >Reporter: Radoslaw Gruchalski > > Hi there, just started working with Flink, second time in the last couple of > years. Second time I hit a major roadblock - the connector I try to work with > is not available off the shelf. I wonder, what's the reason for this > connector: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml > not being Maven Central? It's 21st century, why should I install the jar > manually on every machine where I need to work with the code? Off the top of > my head, my local box, jenkins, customer's deployment. Really? > I do understand this is coming across rather rude, but come on. There's > awesome org.apache.flink organization on Central, so why not every connector > in there? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7673) Flink Kinesis connector - remove dependency on the ASL code
[ https://issues.apache.org/jira/browse/FLINK-7673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Radoslaw Gruchalski updated FLINK-7673: --- Description: Currently the Flink Kinesis connector depends on two artifacts which are available under Amazon Software License code. The artifacts are: {noformat} com.amazonaws amazon-kinesis-producer ${aws.kinesis-kpl.version} com.amazonaws amazon-kinesis-client ${aws.kinesis-kcl.version} com.amazonaws aws-java-sdk-dynamodb com.amazonaws aws-java-sdk-cloudwatch {noformat} This prevents the connector being published to Maven Central. I would like to contribute to Flink Kinesis connector by replacing the ASL licensed code with an implementation using AWS SDK for Java. For that, I'd like to understand two things: - what's the correct process for making the contribution (contributor agreement, etc.) - what's the minimum functionality the alternative implementation would have to provide in order to get accepted Thank you. was: Currently the Flink Kinesis connector depends on two artifacts which are available under Amazon Software License code. The artifacts are: {noformat} com.amazonaws amazon-kinesis-producer ${aws.kinesis-kpl.version} com.amazonaws amazon-kinesis-client ${aws.kinesis-kcl.version} com.amazonaws aws-java-sdk-dynamodb com.amazonaws aws-java-sdk-cloudwatch {/noformat} This prevents the connector being published to Maven Central. I would like to contribute to Flink Kinesis connector by replacing the ASL licensed code with an implementation using AWS SDK for Java. For that, I'd like to understand two things: - what's the correct process for making the contribution (contributor agreement, etc.) - what's the minimum functionality the alternative implementation would have to provide in order to get accepted Thank you. > Flink Kinesis connector - remove dependency on the ASL code > --- > > Key: FLINK-7673 > URL: https://issues.apache.org/jira/browse/FLINK-7673 > Project: Flink > Issue Type: Improvement >Reporter: Radoslaw Gruchalski > > Currently the Flink Kinesis connector depends on two artifacts which are > available under Amazon Software License code. The artifacts are: > {noformat} > >com.amazonaws >amazon-kinesis-producer >${aws.kinesis-kpl.version} > > >com.amazonaws >amazon-kinesis-client >${aws.kinesis-kcl.version} > > > >com.amazonaws > > aws-java-sdk-dynamodb > > >com.amazonaws > > aws-java-sdk-cloudwatch > > > > {noformat} > This prevents the connector being published to Maven Central. I would like to > contribute to Flink Kinesis connector by replacing the ASL licensed code with > an implementation using AWS SDK for Java. For that, I'd like to understand > two things: > - what's the correct process for making the contribution (contributor > agreement, etc.) > - what's the minimum functionality the alternative implementation would have > to provide in order to get accepted > Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7673) Flink Kinesis connector - remove dependency on the ASL code
Radoslaw Gruchalski created FLINK-7673: -- Summary: Flink Kinesis connector - remove dependency on the ASL code Key: FLINK-7673 URL: https://issues.apache.org/jira/browse/FLINK-7673 Project: Flink Issue Type: Improvement Reporter: Radoslaw Gruchalski Currently the Flink Kinesis connector depends on two artifacts which are available under Amazon Software License code. The artifacts are: {noformat} com.amazonaws amazon-kinesis-producer ${aws.kinesis-kpl.version} com.amazonaws amazon-kinesis-client ${aws.kinesis-kcl.version} com.amazonaws aws-java-sdk-dynamodb com.amazonaws aws-java-sdk-cloudwatch {/noformat} This prevents the connector being published to Maven Central. I would like to contribute to Flink Kinesis connector by replacing the ASL licensed code with an implementation using AWS SDK for Java. For that, I'd like to understand two things: - what's the correct process for making the contribution (contributor agreement, etc.) - what's the minimum functionality the alternative implementation would have to provide in order to get accepted Thank you. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7393) Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest
[ https://issues.apache.org/jira/browse/FLINK-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177311#comment-16177311 ] ASF GitHub Bot commented on FLINK-7393: --- GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4708 [FLINK-7393][kinesis connector] Move unit tests that should belong to KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest ## What is the purpose of the change Right now, `FlinkKinesisConsumerTest` has lots of unit tests that actually should belong to `KinesisConfigUtil`, e.g. all the `validateXxxConfiguration()` We need to move those tests out to a new file `KinesisConfigUtilTest` ## Brief change log - Move unit tests that should belong to KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7223 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4708.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4708 commit 8ba44053ea96220fd74a357ff612bebf65553cb7 Author: Bowen LiDate: 2017-09-22T23:10:56Z FLINK-7393 Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest > Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to > KinesisConfigUtilTest > --- > > Key: FLINK-7393 > URL: https://issues.apache.org/jira/browse/FLINK-7393 > Project: Flink > Issue Type: Test > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0 > > > Right now, > [{{FlinkKinesisConsumerTest}}|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java] > has lots of tests that actually should belong to {{KinesisConfigUtil}}, e.g. > all the {{validateXxxConfiguration()}} > We need to move those tests out to a new file {{KinesisConfigUtilTest}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4708: [FLINK-7393][kinesis connector] Move unit tests th...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4708 [FLINK-7393][kinesis connector] Move unit tests that should belong to KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest ## What is the purpose of the change Right now, `FlinkKinesisConsumerTest` has lots of unit tests that actually should belong to `KinesisConfigUtil`, e.g. all the `validateXxxConfiguration()` We need to move those tests out to a new file `KinesisConfigUtilTest` ## Brief change log - Move unit tests that should belong to KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: none ## Documentation none You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink FLINK-7223 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4708.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4708 commit 8ba44053ea96220fd74a357ff612bebf65553cb7 Author: Bowen LiDate: 2017-09-22T23:10:56Z FLINK-7393 Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest ---
[jira] [Updated] (FLINK-7393) Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest
[ https://issues.apache.org/jira/browse/FLINK-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7393: Summary: Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to KinesisConfigUtilTest (was: Move FlinkKinesisConsumerTest) > Move unit tests of KinesisConfigUtil from FlinkKinesisConsumerTest to > KinesisConfigUtilTest > --- > > Key: FLINK-7393 > URL: https://issues.apache.org/jira/browse/FLINK-7393 > Project: Flink > Issue Type: Test > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0 > > > Right now, > [{{FlinkKinesisConsumerTest}}|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java] > has lots of tests that actually should belong to {{KinesisConfigUtil}}, e.g. > all the {{validateXxxConfiguration()}} > We need to move those tests out to a new file {{KinesisConfigUtilTest}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7393) Move FlinkKinesisConsumerTest
[ https://issues.apache.org/jira/browse/FLINK-7393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7393: Summary: Move FlinkKinesisConsumerTest (was: Refactor FlinkKinesisConsumerTest) > Move FlinkKinesisConsumerTest > -- > > Key: FLINK-7393 > URL: https://issues.apache.org/jira/browse/FLINK-7393 > Project: Flink > Issue Type: Test > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0 > > > Right now, > [{{FlinkKinesisConsumerTest}}|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java] > has lots of tests that actually should belong to {{KinesisConfigUtil}}, e.g. > all the {{validateXxxConfiguration()}} > We need to move those tests out to a new file {{KinesisConfigUtilTest}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?
[ https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177247#comment-16177247 ] Radoslaw Gruchalski commented on FLINK-7672: I see, the problem is here: https://github.com/awslabs/amazon-kinesis-client and here: https://github.com/awslabs/amazon-kinesis-producer. I guess the correct approach would be to not use these two artifacts in the Kinesis connector. > Publish Kinesis connector to Maven Central? > --- > > Key: FLINK-7672 > URL: https://issues.apache.org/jira/browse/FLINK-7672 > Project: Flink > Issue Type: Improvement >Reporter: Radoslaw Gruchalski > > Hi there, just started working with Flink, second time in the last couple of > years. Second time I hit a major roadblock - the connector I try to work with > is not available off the shelf. I wonder, what's the reason for this > connector: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml > not being Maven Central? It's 21st century, why should I install the jar > manually on every machine where I need to work with the code? Off the top of > my head, my local box, jenkins, customer's deployment. Really? > I do understand this is coming across rather rude, but come on. There's > awesome org.apache.flink organization on Central, so why not every connector > in there? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?
[ https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177231#comment-16177231 ] Radoslaw Gruchalski commented on FLINK-7672: But the AWS SDK for Java is licensed under Apache 2.0 license: - https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk - https://github.com/aws/aws-sdk-java/blob/master/LICENSE.txt > Publish Kinesis connector to Maven Central? > --- > > Key: FLINK-7672 > URL: https://issues.apache.org/jira/browse/FLINK-7672 > Project: Flink > Issue Type: Improvement >Reporter: Radoslaw Gruchalski > > Hi there, just started working with Flink, second time in the last couple of > years. Second time I hit a major roadblock - the connector I try to work with > is not available off the shelf. I wonder, what's the reason for this > connector: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml > not being Maven Central? It's 21st century, why should I install the jar > manually on every machine where I need to work with the code? Off the top of > my head, my local box, jenkins, customer's deployment. Really? > I do understand this is coming across rather rude, but come on. There's > awesome org.apache.flink organization on Central, so why not every connector > in there? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
[ https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-7656. Resolution: Fixed Fix Version/s: 1.3.3 1.4.0 Fixed for 1.3.3 with 9a8b515c03d21023df4939af02a9ae87c9439284 Fixed for 1.4.0 with 4afca4b3a13b61c2754bc839c77ba4d4eb1d2da2 > Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster > > > Key: FLINK-7656 > URL: https://issues.apache.org/jira/browse/FLINK-7656 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > Fix For: 1.4.0, 1.3.3 > > > The contract that Flink provides to usercode is that that the usercode > classloader is the context classloader whenever usercode is called. > In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} > and {{finalizeOnMaster()}} methods but the context classloader is not set to > the usercode classloader. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
[ https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177217#comment-16177217 ] ASF GitHub Bot commented on FLINK-7656: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4690 > Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster > > > Key: FLINK-7656 > URL: https://issues.apache.org/jira/browse/FLINK-7656 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > The contract that Flink provides to usercode is that that the usercode > classloader is the context classloader whenever usercode is called. > In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} > and {{finalizeOnMaster()}} methods but the context classloader is not set to > the usercode classloader. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4690: [FLINK-7656] [runtime] Switch to user classloader ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4690 ---
[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?
[ https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177215#comment-16177215 ] Fabian Hueske commented on FLINK-7672: -- AFAIK, calling the API might be considered as linking but I'm not a layer. Have you seen other Apache projects publishing Kinesis connector artifacts? > Publish Kinesis connector to Maven Central? > --- > > Key: FLINK-7672 > URL: https://issues.apache.org/jira/browse/FLINK-7672 > Project: Flink > Issue Type: Improvement >Reporter: Radoslaw Gruchalski > > Hi there, just started working with Flink, second time in the last couple of > years. Second time I hit a major roadblock - the connector I try to work with > is not available off the shelf. I wonder, what's the reason for this > connector: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml > not being Maven Central? It's 21st century, why should I install the jar > manually on every machine where I need to work with the code? Off the top of > my head, my local box, jenkins, customer's deployment. Really? > I do understand this is coming across rather rude, but come on. There's > awesome org.apache.flink organization on Central, so why not every connector > in there? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?
[ https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177188#comment-16177188 ] Radoslaw Gruchalski commented on FLINK-7672: I'm not sure, the artifact uses the sdk but does not include it, thus it does not distribute it. > Publish Kinesis connector to Maven Central? > --- > > Key: FLINK-7672 > URL: https://issues.apache.org/jira/browse/FLINK-7672 > Project: Flink > Issue Type: Improvement >Reporter: Radoslaw Gruchalski > > Hi there, just started working with Flink, second time in the last couple of > years. Second time I hit a major roadblock - the connector I try to work with > is not available off the shelf. I wonder, what's the reason for this > connector: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml > not being Maven Central? It's 21st century, why should I install the jar > manually on every machine where I need to work with the code? Off the top of > my head, my local box, jenkins, customer's deployment. Really? > I do understand this is coming across rather rude, but come on. There's > awesome org.apache.flink organization on Central, so why not every connector > in there? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7672) Publish Kinesis connector to Maven Central?
[ https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske resolved FLINK-7672. -- Resolution: Unresolved Unfortunately, Amazon Software License is Cat X (https://www.apache.org/legal/resolved.html#category-x) and cannot be part of distributed artifact. I know this is annoying but this cannot be resolved from our side. > Publish Kinesis connector to Maven Central? > --- > > Key: FLINK-7672 > URL: https://issues.apache.org/jira/browse/FLINK-7672 > Project: Flink > Issue Type: Improvement >Reporter: Radoslaw Gruchalski > > Hi there, just started working with Flink, second time in the last couple of > years. Second time I hit a major roadblock - the connector I try to work with > is not available off the shelf. I wonder, what's the reason for this > connector: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml > not being Maven Central? It's 21st century, why should I install the jar > manually on every machine where I need to work with the code? Off the top of > my head, my local box, jenkins, customer's deployment. Really? > I do understand this is coming across rather rude, but come on. There's > awesome org.apache.flink organization on Central, so why not every connector > in there? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7672) Publish Kinesis connector to Maven Central?
[ https://issues.apache.org/jira/browse/FLINK-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177172#comment-16177172 ] Fabian Hueske commented on FLINK-7672: -- The Kinesis connector is not published to Maven Central because of a licensing issue with the Amazon Software License. To quote the documentation: bq. The flink-connector-kinesis_2.10 has a dependency on code licensed under the Amazon Software License (ASL). Linking to the flink-connector-kinesis will include ASL licensed code into your application. bq. The flink-connector-kinesis_2.10 artifact is not deployed to Maven central as part of Flink releases because of the licensing issue. Therefore, you need to build the connector yourself from the source. > Publish Kinesis connector to Maven Central? > --- > > Key: FLINK-7672 > URL: https://issues.apache.org/jira/browse/FLINK-7672 > Project: Flink > Issue Type: Improvement >Reporter: Radoslaw Gruchalski > > Hi there, just started working with Flink, second time in the last couple of > years. Second time I hit a major roadblock - the connector I try to work with > is not available off the shelf. I wonder, what's the reason for this > connector: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml > not being Maven Central? It's 21st century, why should I install the jar > manually on every machine where I need to work with the code? Off the top of > my head, my local box, jenkins, customer's deployment. Really? > I do understand this is coming across rather rude, but come on. There's > awesome org.apache.flink organization on Central, so why not every connector > in there? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7672) Publish Kinesis connector to Maven Central?
Radoslaw Gruchalski created FLINK-7672: -- Summary: Publish Kinesis connector to Maven Central? Key: FLINK-7672 URL: https://issues.apache.org/jira/browse/FLINK-7672 Project: Flink Issue Type: Improvement Reporter: Radoslaw Gruchalski Hi there, just started working with Flink, second time in the last couple of years. Second time I hit a major roadblock - the connector I try to work with is not available off the shelf. I wonder, what's the reason for this connector: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/pom.xml not being Maven Central? It's 21st century, why should I install the jar manually on every machine where I need to work with the code? Off the top of my head, my local box, jenkins, customer's deployment. Really? I do understand this is coming across rather rude, but come on. There's awesome org.apache.flink organization on Central, so why not every connector in there? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7671) JobManagerCleanupITCase failed in build
Bowen Li created FLINK-7671: --- Summary: JobManagerCleanupITCase failed in build Key: FLINK-7671 URL: https://issues.apache.org/jira/browse/FLINK-7671 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.4.0 Reporter: Bowen Li Fix For: 1.4.0 {code:java} org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase) Time elapsed: 0.27 sec <<< FAILURE! java.lang.AssertionError: assertion failed: expected interface org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, found class org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure at scala.Predef$.assert(Predef.scala:179) at akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424) at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410) at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718) at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.run(JobManagerCleanupITCase.java:224) at akka.testkit.JavaTestKit$Within$1.apply(JavaTestKit.java:232) at akka.testkit.TestKitBase$class.within(TestKit.scala:296) at akka.testkit.TestKit.within(TestKit.scala:718) at akka.testkit.TestKitBase$class.within(TestKit.scala:310) at akka.testkit.TestKit.within(TestKit.scala:718) at akka.testkit.JavaTestKit$Within.(JavaTestKit.java:230) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.(JobManagerCleanupITCase.java:134) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1.(JobManagerCleanupITCase.java:134) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanup(JobManagerCleanupITCase.java:133) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanupCancelledJob(JobManagerCleanupITCase.java:108) {code} in https://travis-ci.org/apache/flink/jobs/278740892 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176945#comment-16176945 ] ASF GitHub Bot commented on FLINK-7635: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4702 Thank you @aljoscha ! I moved unrelated code changes to PR https://github.com/apache/flink/pull/4707 Just curious that why Flink's Scala API has its own `OutputTag` class, rather than using `org.apache.flink.util.OutputTag`? > Support sideOutput in ProcessWindowFunciton > --- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4702: [FLINK-7635][DataStream API][Scala API] Support sideOutpu...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4702 Thank you @aljoscha ! I moved unrelated code changes to PR https://github.com/apache/flink/pull/4707 Just curious that why Flink's Scala API has its own `OutputTag` class, rather than using `org.apache.flink.util.OutputTag`? ---
[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176933#comment-16176933 ] ASF GitHub Bot commented on FLINK-7635: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4702#discussion_r140571380 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -774,6 +774,14 @@ public KeyedStateStore windowState() { public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } + + public void output(OutputTag outputTag, X value) { + if (outputTag == null) { + throw new IllegalArgumentException("OutputTag must not be null."); + } + output.collect(outputTag, new StreamRecord<>(value, cleanupTime(window))); --- End diff -- Make sense. Fixed > Support sideOutput in ProcessWindowFunciton > --- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4702#discussion_r140571380 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -774,6 +774,14 @@ public KeyedStateStore windowState() { public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } + + public void output(OutputTag outputTag, X value) { + if (outputTag == null) { + throw new IllegalArgumentException("OutputTag must not be null."); + } + output.collect(outputTag, new StreamRecord<>(value, cleanupTime(window))); --- End diff -- Make sense. Fixed ---
[GitHub] flink pull request #4707: [hotfix] Add doc for window operators, fix typos a...
GitHub user bowenli86 opened a pull request: https://github.com/apache/flink/pull/4707 [hotfix] Add doc for window operators, fix typos and format code ## What is the purpose of the change Add doc for window operators, fix typos and format code. This is separated from https://github.com/apache/flink/pull/4702 so that PR can contain only feature related changes ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/bowenli86/flink hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4707.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4707 commit fda53bdecfdc203bad009ceca5e3e8123ef3d525 Author: Bowen LiDate: 2017-09-22T18:36:18Z [hotfix] Fix typos and other small things ---
[GitHub] flink issue #4665: [Flink-7611]add metrics to measure the num of data droppe...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4665 @aljoscha sorry for being late the this CR. I left a couple comments ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r140564400 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java --- @@ -231,6 +231,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { --- End diff -- the if-else conditions are duplicated and inefficient, and can be further combined as if(isSkippedElement && isElementLate(element)) { if(lateDataOutputTag != null) { sideOutput(element); } this.lostDataCount.inc(); } ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r140564281 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { --- End diff -- the if-else conditions are duplicated and inefficient, and can be further combined as ```java if(isSkippedElement && isElementLate(element)) { if(lateDataOutputTag != null) { sideOutput(element); } this.lostDataCount.inc(); } ``` ---
[jira] [Commented] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
[ https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176659#comment-16176659 ] ASF GitHub Bot commented on FLINK-7656: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4690 Thanks for the review @aljoscha. I'll change that and merge the fix. > Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster > > > Key: FLINK-7656 > URL: https://issues.apache.org/jira/browse/FLINK-7656 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > The contract that Flink provides to usercode is that that the usercode > classloader is the context classloader whenever usercode is called. > In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} > and {{finalizeOnMaster()}} methods but the context classloader is not set to > the usercode classloader. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4690: [FLINK-7656] [runtime] Switch to user classloader before ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4690 Thanks for the review @aljoscha. I'll change that and merge the fix. ---
[jira] [Commented] (FLINK-7606) CEP operator leaks state
[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176654#comment-16176654 ] Paolo Rendano commented on FLINK-7606: -- Hi [~kkl0u], sorry this week was hard also for me. Next week for sure i'll have some time to check your suggestions and make the trial. Regarding my open issue FLINK-7606, I see also [~matteoferrario29] is losing events in CEP. I'll go deeper next week. > CEP operator leaks state > > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1 >Reporter: Matteo Ferrario > Attachments: heap-dump1.png, heap-dump2.png, heap-dump3.png > > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176606#comment-16176606 ] ASF GitHub Bot commented on FLINK-7635: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4702#discussion_r140529325 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -774,6 +774,14 @@ public KeyedStateStore windowState() { public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } + + public void output(OutputTag outputTag, X value) { + if (outputTag == null) { + throw new IllegalArgumentException("OutputTag must not be null."); + } + output.collect(outputTag, new StreamRecord<>(value, cleanupTime(window))); --- End diff -- I think this should be `window.maxTimestamp()` to match `emitWindowContents()`. > Support sideOutput in ProcessWindowFunciton > --- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7635) Support sideOutput in ProcessWindowFunciton
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176605#comment-16176605 ] ASF GitHub Bot commented on FLINK-7635: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4702#discussion_r140528890 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java --- @@ -212,6 +213,11 @@ public KeyedStateStore windowState() { public KeyedStateStore globalState() { return globalState; } + + @Override + public void output(OutputTag outputTag, X value) { + throw new UnsupportedOperationException("current watermark is not supported in this context"); --- End diff -- Still has the old message about watermarks. > Support sideOutput in ProcessWindowFunciton > --- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4702#discussion_r140529325 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -774,6 +774,14 @@ public KeyedStateStore windowState() { public KeyedStateStore globalState() { return WindowOperator.this.getKeyedStateStore(); } + + public void output(OutputTag outputTag, X value) { + if (outputTag == null) { + throw new IllegalArgumentException("OutputTag must not be null."); + } + output.collect(outputTag, new StreamRecord<>(value, cleanupTime(window))); --- End diff -- I think this should be `window.maxTimestamp()` to match `emitWindowContents()`. ---
[GitHub] flink pull request #4702: [FLINK-7635][DataStream API][Scala API] Support si...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4702#discussion_r140528890 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java --- @@ -212,6 +213,11 @@ public KeyedStateStore windowState() { public KeyedStateStore globalState() { return globalState; } + + @Override + public void output(OutputTag outputTag, X value) { + throw new UnsupportedOperationException("current watermark is not supported in this context"); --- End diff -- Still has the old message about watermarks. ---
[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig
[ https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176592#comment-16176592 ] Aljoscha Krettek commented on FLINK-7669: - And the fat-jar is built against 1.3.2 always or do you build it against 1.4-SNAPSHOT when running on that cluster? > org.apache.flink.api.common.ExecutionConfig cannot be cast to > org.apache.flink.api.common.ExecutionConfig > - > > Key: FLINK-7669 > URL: https://issues.apache.org/jira/browse/FLINK-7669 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.4.0 > Environment: - OS: macOS Sierra > - Oracle JDK 1.8 > - Scala 2.11.11 > - sbt 0.13.16 > - Build from trunk code at commit hash > {{42cc3a2a9c41dda7cf338db36b45131db9150674}} > -- started a local flink node >Reporter: Raymond Tay > > Latest code pulled from trunk threw errors at runtime when i ran a job > against it; but when i ran the JAR against the stable version {{1.3.2}} it > was OK. Here is the stacktrace. > An exception is being thrown : > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of > programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22# > Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job > completion. > Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader > session id ----. > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 > (Flink Streaming Job) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629) > at > org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53) > at > org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to > submit job 05dd8e60c6fda3b96fc22ef6cf389a23 (Flink Streaming Job) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1358) > at >
[jira] [Commented] (FLINK-7670) typo in docs runtime section
[ https://issues.apache.org/jira/browse/FLINK-7670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176563#comment-16176563 ] ASF GitHub Bot commented on FLINK-7670: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4706 +1 > typo in docs runtime section > > > Key: FLINK-7670 > URL: https://issues.apache.org/jira/browse/FLINK-7670 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Kewei SHANG >Priority: Minor > Fix For: 1.3.3 > > > The following link to Savepoints page > [Savepoints](..//setup/savepoints.html) > change to > [Savepoints](../setup/savepoints.html) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4706: [FLINK-7670][doc] fix typo in docs runtime section
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4706 +1 ---
[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig
[ https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176560#comment-16176560 ] Raymond Tay commented on FLINK-7669: Basically, the program is built using {{sbt assembly}} which produces a _fatjar_ and i have 2 installations running locally (version {{1.3.2}} which i d/l from the website and locally built {{1.4.0-SNAPSHOT}}. The execution command is like this: {{/bin/flink run -p 4 JARFILE}} Another piece of information i wanted to share is this: i built the {{1.4.0-SNAPSHOT}} locally 2 days back and it was alright and this failure only happened when i pulled again today > org.apache.flink.api.common.ExecutionConfig cannot be cast to > org.apache.flink.api.common.ExecutionConfig > - > > Key: FLINK-7669 > URL: https://issues.apache.org/jira/browse/FLINK-7669 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.4.0 > Environment: - OS: macOS Sierra > - Oracle JDK 1.8 > - Scala 2.11.11 > - sbt 0.13.16 > - Build from trunk code at commit hash > {{42cc3a2a9c41dda7cf338db36b45131db9150674}} > -- started a local flink node >Reporter: Raymond Tay > > Latest code pulled from trunk threw errors at runtime when i ran a job > against it; but when i ran the JAR against the stable version {{1.3.2}} it > was OK. Here is the stacktrace. > An exception is being thrown : > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of > programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22# > Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job > completion. > Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader > session id ----. > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 > (Flink Streaming Job) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629) > at > org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53) > at > org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132) > Caused by:
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176559#comment-16176559 ] ASF GitHub Bot commented on FLINK-7632: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4696 Nice improvement to the docs, had a few comments. > Better documentation and examples on C* sink usage for Pojo and Tuples data > types > - > > Key: FLINK-7632 > URL: https://issues.apache.org/jira/browse/FLINK-7632 > Project: Flink > Issue Type: Sub-task > Components: Cassandra Connector, Documentation >Reporter: Michael Fong >Assignee: Michael Fong > > Cassandra sink supports Pojo and Java Tuple data types. We should improve > documentation on its usage as well as some concrete / meaningful examples for > both cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4696: [FLINK-7632] [document] Overhaul on Cassandra connector d...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4696 Nice improvement to the docs, had a few comments. ---
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176556#comment-16176556 ] ASF GitHub Bot commented on FLINK-7632: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140517696 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176555#comment-16176555 ] ASF GitHub Bot commented on FLINK-7632: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140517986 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140518123 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String value, Collector > out) { +// normalize and split the line +String[] words = value.toLowerCase().split("\\W+"); + +// emit the pairs +for
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140517823 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String value, Collector > out) { +// normalize and split the line +String[] words = value.toLowerCase().split("\\W+"); + +// emit the pairs +for
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176553#comment-16176553 ] ASF GitHub Bot commented on FLINK-7632: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140516418 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: --- End diff -- imo this is a bit redundant, we are already linking to the checkpoint docs after all. > Better documentation and examples on C* sink usage for Pojo and Tuples data > types > - > > Key: FLINK-7632 > URL: https://issues.apache.org/jira/browse/FLINK-7632 > Project: Flink > Issue Type: Sub-task > Components: Cassandra Connector, Documentation >Reporter: Michael Fong >Assignee: Michael Fong > > Cassandra sink supports Pojo and Java Tuple data types. We should improve > documentation on its usage as well as some concrete / meaningful examples for > both cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140517182 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` --- End diff -- I would leave this out. Its one of those things that are easily out-dated, and don't provide immediate value when reading the docs for the first time. If a user stumbles upon this the error message should be self-explanatory; if it isn't we should change it accordingly. ---
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140516418 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: --- End diff -- imo this is a bit redundant, we are already linking to the checkpoint docs after all. ---
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176558#comment-16176558 ] ASF GitHub Bot commented on FLINK-7632: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140517823 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176554#comment-16176554 ] ASF GitHub Bot commented on FLINK-7632: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140517182 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` --- End diff -- I would leave this out. Its one of those things that are easily out-dated, and don't provide immediate value when reading the docs for the first time. If a user stumbles upon this the error message should be self-explanatory; if it isn't we should change it accordingly. > Better documentation and examples on C* sink usage for Pojo and Tuples data > types > - > > Key: FLINK-7632 > URL: https://issues.apache.org/jira/browse/FLINK-7632 >
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140517696 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String value, Collector > out) { +// normalize and split the line +String[] words = value.toLowerCase().split("\\W+"); + +// emit the pairs +for
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176552#comment-16176552 ] ASF GitHub Bot commented on FLINK-7632: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140516135 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. --- End diff -- theres a double space after mutations > Better documentation and examples on C* sink usage for Pojo and Tuples data > types > - > > Key: FLINK-7632 > URL: https://issues.apache.org/jira/browse/FLINK-7632 > Project: Flink > Issue Type: Sub-task > Components: Cassandra Connector, Documentation >Reporter: Michael Fong >Assignee: Michael Fong > > Cassandra sink supports Pojo and Java Tuple data types. We should improve > documentation on its usage as well as some concrete / meaningful examples for > both cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140516135 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. --- End diff -- theres a double space after mutations ---
[GitHub] flink pull request #4696: [FLINK-7632] [document] Overhaul on Cassandra conn...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140517986 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String value, Collector > out) { +// normalize and split the line +String[] words = value.toLowerCase().split("\\W+"); + +// emit the pairs +for
[jira] [Commented] (FLINK-7632) Better documentation and examples on C* sink usage for Pojo and Tuples data types
[ https://issues.apache.org/jira/browse/FLINK-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176557#comment-16176557 ] ASF GitHub Bot commented on FLINK-7632: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4696#discussion_r140518123 --- Diff: docs/dev/connectors/cassandra.md --- @@ -78,76 +96,195 @@ Note that that enabling this feature will have an adverse impact on latency. Note: The write-ahead log functionality is currently experimental. In many cases it is sufficent to use the connector without enabling it. Please report problems to the development mailing list. +### Checkpointing and Fault Tolerance +With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance. - Example +Note:However, current Cassandra Sink implementation does not flush the pending mutations before the checkpoint was triggered. Thus, some in-flight mutations might not be replayed when the job recovered. + +More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html) + +To enable fault tolerant guarantee, checkpointing of the topology needs to be enabled at the execution environment: {% highlight java %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -@Override -public Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoint("127.0.0.1").build(); -} - }) - .build(); +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); // checkpoint every 5000 msecs {% endhighlight %} {% highlight scala %} -CassandraSink.addSink(input) - .setQuery("INSERT INTO example.values (id, counter) values (?, ?);") - .setClusterBuilder(new ClusterBuilder() { -override def buildCluster(builder: Cluster.Builder): Cluster = { - builder.addContactPoint("127.0.0.1").build() -} - }) - .build() +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.enableCheckpointing(5000) // checkpoint every 5000 msecs +{% endhighlight %} + + + +## Examples + +The Cassandra sinks currently support both Java Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on [SocketWindowWordCount](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java), for Pojo and Java Tuple data types respectively. + +In all these examples, we assumed the associated Keyspace `example` and Table `wordcount` have been created. + + + +{% highlight sql %} +CREATE KEYSPACE IF NOT EXISTS example +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; +CREATE TABLE IF NOT EXISTS example.wordcount ( +word text, +count bigint, +PRIMARY KEY(word) +); +{% endhighlight %} + + + +### Cassandra Sink Example for Streaming Java Tuple Data Type +While storing the result with Java Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery('stmt')) to persist each record back to database. With the upsert query cached as `PreparedStatement`, Cassandra connector internally converts each Tuple elements as parameters to the statement. + +For details about `PreparedStatement` and `BoundStatement`, please visit [DataStax Java Driver manual](https://docs.datastax.com/en/developer/java-driver/2.1/manual/statements/prepared/) + +Please note that if the upsert query were not set, an `IllegalArgumentException` would be thrown with the following error message `Query must not be null or empty.` + + + +{% highlight java %} +// get the execution environment +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// get input data by connecting to the socket +DataStream text = env.socketTextStream(hostname, port, "\n"); + +// parse the data, group it, window it, and aggregate the counts +DataStream> result = text + +.flatMap(new FlatMapFunction >() { +@Override +public void flatMap(String
[jira] [Commented] (FLINK-7524) Task "xxx" did not react to cancelling signal, but is stuck in method
[ https://issues.apache.org/jira/browse/FLINK-7524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176539#comment-16176539 ] ASF GitHub Bot commented on FLINK-7524: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4639#discussion_r140516796 --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java --- @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException { } synchronized (getSynchronizationLock()) { - if (closed) { - IOUtils.closeQuietly(closeable); - throw new IOException("Cannot register Closeable, registry is already closed. Closing argument."); + if (!closed) { + doRegister(closeable, closeableToRef); + return; } - - doRegister(closeable, closeableToRef); } + + IOUtils.closeQuietly(closeable); --- End diff -- Makes sense!  > Task "xxx" did not react to cancelling signal, but is stuck in method > - > > Key: FLINK-7524 > URL: https://issues.apache.org/jira/browse/FLINK-7524 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Bowen Li >Priority: Blocker > Fix For: 1.4.0 > > > Hi, > I observed the following errors in taskmanager.log > {code:java} > 2017-08-25 17:03:40,141 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'TriggerWindow(SlidingEventTimeWindows(25920, > 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal, but is stuck in > method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:399) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > java.lang.Thread.run(Thread.java:748) > ... > 2017-08-25 17:05:10,139 INFO org.apache.flink.runtime.taskmanager.Task > - Notifying TaskManager about fatal error. Task > 'TriggerWindow(SlidingEventTimeWindows(25920, 360), > AggregatingStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f65b6aa2, > aggFunction=com.streamingApp$MyAggregateFunction@1f193686}, > EventTimeTrigger(), WindowedStream.aggregate(WindowedStream.java:858)) -> > Sink: prod_item (2/6)' did not react to cancelling signal in the last 30 > seconds, but is stuck in method: > > org.apache.flink.util.AbstractCloseableRegistry.unregisterClosable(AbstractCloseableRegistry.java:84) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStream(StateSnapshotContextSynchronousImpl.java:137) > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.close(StateSnapshotContextSynchronousImpl.java:147) >
[GitHub] flink pull request #4639: [FLINK-7524] Remove potentially blocking behaviour...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4639#discussion_r140516796 --- Diff: flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java --- @@ -61,13 +75,14 @@ public final void registerClosable(C closeable) throws IOException { } synchronized (getSynchronizationLock()) { - if (closed) { - IOUtils.closeQuietly(closeable); - throw new IOException("Cannot register Closeable, registry is already closed. Closing argument."); + if (!closed) { + doRegister(closeable, closeableToRef); + return; } - - doRegister(closeable, closeableToRef); } + + IOUtils.closeQuietly(closeable); --- End diff -- Makes sense! 👌 ---
[jira] [Commented] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
[ https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176537#comment-16176537 ] ASF GitHub Bot commented on FLINK-7656: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4690#discussion_r140515705 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java --- @@ -84,10 +88,10 @@ public void testConnectMultipleTargets() { @Test public void testOutputFormatVertex() { try { - final TestingOutputFormat outputFormat = new TestingOutputFormat(); + final OutputFormat outputFormat = new TestingOutputFormat(); final OutputFormatVertex of = new OutputFormatVertex("Name"); new TaskConfig(of.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper(outputFormat)); - final ClassLoader cl = getClass().getClassLoader(); + final ClassLoader cl = new FlinkUserCodeClassLoader(new URL[0], getClass().getClassLoader()); --- End diff -- This doesn't exist anymore but you can use `FlinkUserCodeClassLoaders.childFirst()`/`parentFirst()`. > Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster > > > Key: FLINK-7656 > URL: https://issues.apache.org/jira/browse/FLINK-7656 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > The contract that Flink provides to usercode is that that the usercode > classloader is the context classloader whenever usercode is called. > In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} > and {{finalizeOnMaster()}} methods but the context classloader is not set to > the usercode classloader. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4690: [FLINK-7656] [runtime] Switch to user classloader ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4690#discussion_r140515705 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java --- @@ -84,10 +88,10 @@ public void testConnectMultipleTargets() { @Test public void testOutputFormatVertex() { try { - final TestingOutputFormat outputFormat = new TestingOutputFormat(); + final OutputFormat outputFormat = new TestingOutputFormat(); final OutputFormatVertex of = new OutputFormatVertex("Name"); new TaskConfig(of.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper(outputFormat)); - final ClassLoader cl = getClass().getClassLoader(); + final ClassLoader cl = new FlinkUserCodeClassLoader(new URL[0], getClass().getClassLoader()); --- End diff -- This doesn't exist anymore but you can use `FlinkUserCodeClassLoaders.childFirst()`/`parentFirst()`. ---
[jira] [Assigned] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests
[ https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-6444: --- Assignee: mingleizhang (was: Hai Zhou_UTC+8) > Add a check that '@VisibleForTesting' methods are only used in tests > > > Key: FLINK-6444 > URL: https://issues.apache.org/jira/browse/FLINK-6444 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: mingleizhang > > Some methods are annotated with {{@VisibleForTesting}}. These methods should > only be called from tests. > This is currently not enforced / checked during the build. We should add such > a check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7670) typo in docs runtime section
[ https://issues.apache.org/jira/browse/FLINK-7670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176450#comment-16176450 ] ASF GitHub Bot commented on FLINK-7670: --- GitHub user keweishang opened a pull request: https://github.com/apache/flink/pull/4706 [FLINK-7670][doc] fix typo in docs runtime section The following link to Savepoints page [Savepoints](..//setup/savepoints.html) change to [Savepoints](../setup/savepoints.html) Otherwise, the link wouldn't work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/keweishang/flink FLINK-7670 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4706.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4706 commit 098740fceeaed11f0072c9cbf612783b4829e98b Author: Kewei ShangDate: 2017-09-22T13:40:41Z [FLINK-7670][doc] fix typo in docs runtime section > typo in docs runtime section > > > Key: FLINK-7670 > URL: https://issues.apache.org/jira/browse/FLINK-7670 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.3.2 >Reporter: Kewei SHANG >Priority: Minor > Fix For: 1.3.3 > > > The following link to Savepoints page > [Savepoints](..//setup/savepoints.html) > change to > [Savepoints](../setup/savepoints.html) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4706: [FLINK-7670][doc] fix typo in docs runtime section
GitHub user keweishang opened a pull request: https://github.com/apache/flink/pull/4706 [FLINK-7670][doc] fix typo in docs runtime section The following link to Savepoints page [Savepoints](..//setup/savepoints.html) change to [Savepoints](../setup/savepoints.html) Otherwise, the link wouldn't work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/keweishang/flink FLINK-7670 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4706.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4706 commit 098740fceeaed11f0072c9cbf612783b4829e98b Author: Kewei ShangDate: 2017-09-22T13:40:41Z [FLINK-7670][doc] fix typo in docs runtime section ---
[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests
[ https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176444#comment-16176444 ] ASF GitHub Bot commented on FLINK-6444: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4705 [FLINK-6444] [build] Add a check that '@VisibleForTesting' methods ar… ## What is the purpose of the change ## Brief change log ## Verifying this change You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6444 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4705.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4705 commit 425c6eaee2d76df2df4f5d2070a33b1b4b9ddbcb Author: zhangmingleiDate: 2017-09-22T13:44:18Z [FLINK-6444] [build] Add a check that '@VisibleForTesting' methods are only used in tests > Add a check that '@VisibleForTesting' methods are only used in tests > > > Key: FLINK-6444 > URL: https://issues.apache.org/jira/browse/FLINK-6444 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Hai Zhou_UTC+8 > > Some methods are annotated with {{@VisibleForTesting}}. These methods should > only be called from tests. > This is currently not enforced / checked during the build. We should add such > a check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4705: [FLINK-6444] [build] Add a check that '@VisibleFor...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4705 [FLINK-6444] [build] Add a check that '@VisibleForTesting' methods ar… ## What is the purpose of the change ## Brief change log ## Verifying this change You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6444 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4705.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4705 commit 425c6eaee2d76df2df4f5d2070a33b1b4b9ddbcb Author: zhangmingleiDate: 2017-09-22T13:44:18Z [FLINK-6444] [build] Add a check that '@VisibleForTesting' methods are only used in tests ---
[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig
[ https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176428#comment-16176428 ] Aljoscha Krettek commented on FLINK-7669: - How are your running your program? > org.apache.flink.api.common.ExecutionConfig cannot be cast to > org.apache.flink.api.common.ExecutionConfig > - > > Key: FLINK-7669 > URL: https://issues.apache.org/jira/browse/FLINK-7669 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.4.0 > Environment: - OS: macOS Sierra > - Oracle JDK 1.8 > - Scala 2.11.11 > - sbt 0.13.16 > - Build from trunk code at commit hash > {{42cc3a2a9c41dda7cf338db36b45131db9150674}} > -- started a local flink node >Reporter: Raymond Tay > > Latest code pulled from trunk threw errors at runtime when i ran a job > against it; but when i ran the JAR against the stable version {{1.3.2}} it > was OK. Here is the stacktrace. > An exception is being thrown : > {noformat} > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8081 > Starting execution of > programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22# > Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job > completion. > Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader > session id ----. > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 > (Flink Streaming Job) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629) > at > org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53) > at > org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to > submit job 05dd8e60c6fda3b96fc22ef6cf389a23 (Flink Streaming Job) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1358) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:484) > at >
[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests
[ https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176416#comment-16176416 ] Hai Zhou_UTC+8 commented on FLINK-6444: --- I am trying the first. Implement a "Spotbugs" Detector, and configure a "Spotbugs" rule. > Add a check that '@VisibleForTesting' methods are only used in tests > > > Key: FLINK-6444 > URL: https://issues.apache.org/jira/browse/FLINK-6444 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Stephan Ewen >Assignee: Hai Zhou_UTC+8 > > Some methods are annotated with {{@VisibleForTesting}}. These methods should > only be called from tests. > This is currently not enforced / checked during the build. We should add such > a check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7670) typo in docs runtime section
Kewei SHANG created FLINK-7670: -- Summary: typo in docs runtime section Key: FLINK-7670 URL: https://issues.apache.org/jira/browse/FLINK-7670 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.3.2 Reporter: Kewei SHANG Priority: Minor Fix For: 1.3.3 The following link to Savepoints page [Savepoints](..//setup/savepoints.html) change to [Savepoints](../setup/savepoints.html) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7537) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/FLINK-7537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hai Zhou_UTC+8 closed FLINK-7537. - Resolution: Fixed > Add InfluxDB Sink for Flink Streaming > - > > Key: FLINK-7537 > URL: https://issues.apache.org/jira/browse/FLINK-7537 > Project: Flink > Issue Type: Wish > Components: Streaming Connectors >Affects Versions: 1.3.0 >Reporter: Hai Zhou_UTC+8 >Assignee: Hai Zhou_UTC+8 > Fix For: 1.3.0 > > > InfluxDBSink via implementation RichSinkFunction. > [BAHIR-134|https://issues.apache.org/jira/browse/BAHIR-134] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7600) shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to avoid updateCredentials Exception
[ https://issues.apache.org/jira/browse/FLINK-7600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-7600. Resolution: Fixed Thanks for the contribution! Fixed for {{master}} via 6c1a946562ec8ac4825b871aefdec040cc02aaf2. Fixed for {{1.3.3}} via cc0de8486b2c83bf03da0f8115df4b5ff72f6ed6. > shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to > avoid updateCredentials Exception > --- > > Key: FLINK-7600 > URL: https://issues.apache.org/jira/browse/FLINK-7600 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > we saw the following warning in Flink log: > {code:java} > 2017-08-11 02:33:24,473 WARN > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon > - Exception during updateCredentials > java.lang.InterruptedException: sleep interrupted > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$5.run(Daemon.java:316) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > According to discussion in > https://github.com/awslabs/amazon-kinesis-producer/issues/10, setting the > delay to 100 will fix this issue -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7600) shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to avoid updateCredentials Exception
[ https://issues.apache.org/jira/browse/FLINK-7600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-7600. -- > shorten delay of KinesisProducerConfiguration.setCredentialsRefreshDelay() to > avoid updateCredentials Exception > --- > > Key: FLINK-7600 > URL: https://issues.apache.org/jira/browse/FLINK-7600 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.2 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > we saw the following warning in Flink log: > {code:java} > 2017-08-11 02:33:24,473 WARN > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon > - Exception during updateCredentials > java.lang.InterruptedException: sleep interrupted > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$5.run(Daemon.java:316) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > According to discussion in > https://github.com/awslabs/amazon-kinesis-producer/issues/10, setting the > delay to 100 will fix this issue -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
[ https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-7508. -- > switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode > rather than Per_Request mode > > > Key: FLINK-7508 > URL: https://issues.apache.org/jira/browse/FLINK-7508 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0 > > > KinesisProducerLibrary (KPL) 0.10.x had been using a > One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which > is very expensive. > 0.12.4 introduced a new [ThreadingMode - > Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], > which will use a thread pool. This hugely improves KPL's performance and > reduces consumed resources. By default, KPL still uses per-request mode. We > should explicitly switch FlinkKinesisProducer's KPL threading mode to > 'Pooled'. > This work depends on FLINK-7366 and FLINK-7508 > Benchmarking I did: > * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR > cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink > job generates about 21million UserRecords, which means that we generated a > test load of 21million UserRecords at the first minute of each hour. > * Criteria: Test KPL throughput per minute. Since the default RecordTTL for > KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL > within a minute, or we will see UserRecord expiration errors. > * One-New-Thread-Per-Request model: max throughput is about 2million > UserRecords per min; it doesn't go beyond that because CPU utilization goes > to 100%, everything stopped working and that Flink job crashed. > * Thread-Pool model with pool size of 10: it sends out 21million UserRecords > within 30 sec without any UserRecord expiration errors. The average peak CPU > utilization is about 20% - 30%. So 21million UserRecords/min is not the max > throughput of thread-pool model. We didn't go any further because 1) this > throughput is already a couple times more than what we really need, and 2) we > don't have a quick way of increasing the test load > Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. > [~tzulitai] What do you think -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7508) switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode rather than Per_Request mode
[ https://issues.apache.org/jira/browse/FLINK-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-7508. Resolution: Fixed Thanks for the contribution [~phoenixjiangnan]. Resolved for {{master}} via 637dde889fe2d21ff6990749a750356d20fcd965. > switch FlinkKinesisProducer to use KPL's ThreadingMode to ThreadedPool mode > rather than Per_Request mode > > > Key: FLINK-7508 > URL: https://issues.apache.org/jira/browse/FLINK-7508 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0 > > > KinesisProducerLibrary (KPL) 0.10.x had been using a > One-New-Thread-Per-Request model for all requests sent to AWS Kinesis, which > is very expensive. > 0.12.4 introduced a new [ThreadingMode - > Pooled|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L225], > which will use a thread pool. This hugely improves KPL's performance and > reduces consumed resources. By default, KPL still uses per-request mode. We > should explicitly switch FlinkKinesisProducer's KPL threading mode to > 'Pooled'. > This work depends on FLINK-7366 and FLINK-7508 > Benchmarking I did: > * Environment: Running a Flink hourly-sliding windowing job on 18-node EMR > cluster with R4.2xlarge instances. Each hourly-sliding window in the Flink > job generates about 21million UserRecords, which means that we generated a > test load of 21million UserRecords at the first minute of each hour. > * Criteria: Test KPL throughput per minute. Since the default RecordTTL for > KPL is 30 sec, we can be sure that either all UserRecords are sent by KPL > within a minute, or we will see UserRecord expiration errors. > * One-New-Thread-Per-Request model: max throughput is about 2million > UserRecords per min; it doesn't go beyond that because CPU utilization goes > to 100%, everything stopped working and that Flink job crashed. > * Thread-Pool model with pool size of 10: it sends out 21million UserRecords > within 30 sec without any UserRecord expiration errors. The average peak CPU > utilization is about 20% - 30%. So 21million UserRecords/min is not the max > throughput of thread-pool model. We didn't go any further because 1) this > throughput is already a couple times more than what we really need, and 2) we > don't have a quick way of increasing the test load > Thus, I propose switching FlinkKinesisProducer to Thread-Pool mode. > [~tzulitai] What do you think -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7656) Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster
[ https://issues.apache.org/jira/browse/FLINK-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176316#comment-16176316 ] ASF GitHub Bot commented on FLINK-7656: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4690 Can you take a brief look at this change @aljoscha? Thanks! > Switch to user ClassLoader when invoking initializeOnMaster finalizeOnMaster > > > Key: FLINK-7656 > URL: https://issues.apache.org/jira/browse/FLINK-7656 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.3.2 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > The contract that Flink provides to usercode is that that the usercode > classloader is the context classloader whenever usercode is called. > In {{OutputFormatVertex}} usercode is called in the {{initializeOnMaster()}} > and {{finalizeOnMaster()}} methods but the context classloader is not set to > the usercode classloader. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4690: [FLINK-7656] [runtime] Switch to user classloader before ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4690 Can you take a brief look at this change @aljoscha? Thanks! ---
[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor
[ https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176315#comment-16176315 ] ASF GitHub Bot commented on FLINK-7636: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140474944 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { + + /** +* Creates a copy of this Flink RelOptTable with new Flink Table and new row type. +* +* @param newTablenew flink table +* @param typeFactory type factory to create new row type of new flink table +* @return The copy of this Flink RelOptTable with new Flink table and new row type +*/ + def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): FlinkRelOptTable = { +val newRowType = newTable.getRowType(typeFactory) +FlinkRelOptTable.create(schema, newRowType, names, newTable) + } + + /** +* Extends a table with the given extra fields, which is not supported now. +* +* @param extendedFields --- End diff -- I meant to either remove the `@param extendedFields` or add a comment. > Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan > node constructor > -- > > Key: FLINK-7636 > URL: https://issues.apache.org/jira/browse/FLINK-7636 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > At present, there are two ways to fetch TableSource of a TableSourceScan node > (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...): > 1. > {code} > val relOptTable: RelOptTable = getTable() > val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) > val tableSouce = tableSourceTable.tableSource > {code} > the result of getTable() is instance of RelOptTableImpl now, and it will not > change after
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140474944 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { + + /** +* Creates a copy of this Flink RelOptTable with new Flink Table and new row type. +* +* @param newTablenew flink table +* @param typeFactory type factory to create new row type of new flink table +* @return The copy of this Flink RelOptTable with new Flink table and new row type +*/ + def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): FlinkRelOptTable = { +val newRowType = newTable.getRowType(typeFactory) +FlinkRelOptTable.create(schema, newRowType, names, newTable) + } + + /** +* Extends a table with the given extra fields, which is not supported now. +* +* @param extendedFields --- End diff -- I meant to either remove the `@param extendedFields` or add a comment. ---
[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor
[ https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176254#comment-16176254 ] ASF GitHub Bot commented on FLINK-7636: --- Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/4681 @fhueske Thanks for your advice. I already update the pr based on your suggestion. > Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan > node constructor > -- > > Key: FLINK-7636 > URL: https://issues.apache.org/jira/browse/FLINK-7636 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > At present, there are two ways to fetch TableSource of a TableSourceScan node > (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...): > 1. > {code} > val relOptTable: RelOptTable = getTable() > val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) > val tableSouce = tableSourceTable.tableSource > {code} > the result of getTable() is instance of RelOptTableImpl now, and it will not > change after RelNode tree is built. > 2. now all TableSourceScan contains a tablesource as constructor parameter, > so we could fetch the tablesource directly later. > > The result tableSource is different with each other by above two ways after > apply project push(PPD) down or filter push down(FPD). It is very confusing. > we hope to fix the problem by introducing FlinkRelOptTable to replace > RelOptTableImpl, and remove tableSource parameter from TableSourceScan's > constructor. After PPD or FPD, a new FlinkRelOptTable instance which > contains a new TableSourceTable will be passed to TableSourceScan > constructor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOptTable,...
Github user beyond1920 commented on the issue: https://github.com/apache/flink/pull/4681 @fhueske Thanks for your advice. I already update the pr based on your suggestion. ---
[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor
[ https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176252#comment-16176252 ] ASF GitHub Bot commented on FLINK-7636: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140462913 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { + + /** +* Creates a copy of this Flink RelOptTable with new Flink Table and new row type. +* +* @param newTablenew flink table +* @param typeFactory type factory to create new row type of new flink table +* @return The copy of this Flink RelOptTable with new Flink table and new row type +*/ + def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): FlinkRelOptTable = { +val newRowType = newTable.getRowType(typeFactory) +FlinkRelOptTable.create(schema, newRowType, names, newTable) + } + + /** +* Extends a table with the given extra fields, which is not supported now. +* +* @param extendedFields --- End diff -- Do you mean add comment for extendedFields? > Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan > node constructor > -- > > Key: FLINK-7636 > URL: https://issues.apache.org/jira/browse/FLINK-7636 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > At present, there are two ways to fetch TableSource of a TableSourceScan node > (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...): > 1. > {code} > val relOptTable: RelOptTable = getTable() > val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) > val tableSouce = tableSourceTable.tableSource > {code} > the result of getTable() is instance of RelOptTableImpl now, and it will not > change after RelNode tree is built. >
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140462913 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { + + /** +* Creates a copy of this Flink RelOptTable with new Flink Table and new row type. +* +* @param newTablenew flink table +* @param typeFactory type factory to create new row type of new flink table +* @return The copy of this Flink RelOptTable with new Flink table and new row type +*/ + def copy(newTable: FlinkTable[_], typeFactory: RelDataTypeFactory): FlinkRelOptTable = { +val newRowType = newTable.getRowType(typeFactory) +FlinkRelOptTable.create(schema, newRowType, names, newTable) + } + + /** +* Extends a table with the given extra fields, which is not supported now. +* +* @param extendedFields --- End diff -- Do you mean add comment for extendedFields? ---
[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor
[ https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176251#comment-16176251 ] ASF GitHub Bot commented on FLINK-7636: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140462814 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { --- End diff -- done > Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan > node constructor > -- > > Key: FLINK-7636 > URL: https://issues.apache.org/jira/browse/FLINK-7636 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > At present, there are two ways to fetch TableSource of a TableSourceScan node > (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...): > 1. > {code} > val relOptTable: RelOptTable = getTable() > val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) > val tableSouce = tableSourceTable.tableSource > {code} > the result of getTable() is instance of RelOptTableImpl now, and it will not > change after RelNode tree is built. > 2. now all TableSourceScan contains a tablesource as constructor parameter, > so we could fetch the tablesource directly later. > > The result tableSource is different with each other by above two ways after > apply project push(PPD) down or filter push down(FPD). It is very confusing. > we hope to fix the problem by introducing FlinkRelOptTable to replace > RelOptTableImpl, and remove tableSource parameter from TableSourceScan's > constructor. After PPD or FPD, a new FlinkRelOptTable instance which > contains a new TableSourceTable will be passed to TableSourceScan > constructor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140462814 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], +table: FlinkTable[_]) extends PreparingTable { --- End diff -- done ---
[GitHub] flink pull request #4681: [FLINK-7636][Table API & SQL]Introduce Flink RelOp...
Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140462752 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], --- End diff -- done ---
[jira] [Commented] (FLINK-7636) Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan node constructor
[ https://issues.apache.org/jira/browse/FLINK-7636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16176248#comment-16176248 ] ASF GitHub Bot commented on FLINK-7636: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/4681#discussion_r140462752 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema, RelOptTable} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.prepare.CalcitePrepareImpl +import java.util.{List => JList} + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.prepare.Prepare.PreparingTable +import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFactory} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema.{StreamableTable, TranslatableTable} +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.stats.FlinkStatistic + +import scala.collection.JavaConverters._ + +/** + * FlinkRelOptTable wraps a FlinkTable + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + *respect to the Connection producing this table. + * @param table wrapped flink table + */ + +class FlinkRelOptTable protected( +schema: RelOptSchema, +rowType: RelDataType, +names: JList[String], --- End diff -- done > Introduce Flink RelOptTable, and remove tableSource from all TableSourceScan > node constructor > -- > > Key: FLINK-7636 > URL: https://issues.apache.org/jira/browse/FLINK-7636 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > At present, there are two ways to fetch TableSource of a TableSourceScan node > (e.g LogicalTableSourceScan, PhysicalTableSourceScan ...): > 1. > {code} > val relOptTable: RelOptTable = getTable() > val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) > val tableSouce = tableSourceTable.tableSource > {code} > the result of getTable() is instance of RelOptTableImpl now, and it will not > change after RelNode tree is built. > 2. now all TableSourceScan contains a tablesource as constructor parameter, > so we could fetch the tablesource directly later. > > The result tableSource is different with each other by above two ways after > apply project push(PPD) down or filter push down(FPD). It is very confusing. > we hope to fix the problem by introducing FlinkRelOptTable to replace > RelOptTableImpl, and remove tableSource parameter from TableSourceScan's > constructor. After PPD or FPD, a new FlinkRelOptTable instance which > contains a new TableSourceTable will be passed to TableSourceScan > constructor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)