[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577422#comment-16577422
 ] 

ASF GitHub Bot commented on FLINK-9891:
---

yanghua commented on a change in pull request #6540: [FLINK-9891] Added hook to 
shutdown cluster if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540#discussion_r209443343
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
 ##
 @@ -1219,4 +1231,21 @@ public static void 
setJobManagerAddressInConfig(Configuration config, InetSocket
return constructor.newInstance(params);
}
 
+   private static class ClusterShutdownHook extends Thread {
+   private final ClusterClient client;
+
+   ClusterShutdownHook(ClusterClient client) {
 
 Review comment:
   here use `@Nonnull` looks better to me


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been 

[GitHub] yanghua commented on a change in pull request #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

2018-08-11 Thread GitBox
yanghua commented on a change in pull request #6540: [FLINK-9891] Added hook to 
shutdown cluster if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540#discussion_r209443483
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
 ##
 @@ -249,13 +250,24 @@ protected void run(String[] args) throws Exception {
LOG.info("Could not properly shut down 
the client.", e);
}
} else {
+   final ClusterShutdownHook shutdownHook;
if (clusterId != null) {
client = 
clusterDescriptor.retrieve(clusterId);
+   shutdownHook = null;
} else {
// also in job mode we have to deploy a 
session cluster because the job
// might consist of multiple parts 
(e.g. when using collect)
final ClusterSpecification 
clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
+   // if not running in detached mode, add 
a shutdown hook to shut down cluster if client exits
+   // there's a race-condition here if cli 
is killed before shutdown hook is installed
+   if (!runOptions.getDetachedMode()) {
+   shutdownHook = new 
ClusterShutdownHook(client);
+   
Runtime.getRuntime().addShutdownHook(shutdownHook);
 
 Review comment:
   In flink-core there is a `ShutdownHookUtil` util class you can consider


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577421#comment-16577421
 ] 

ASF GitHub Bot commented on FLINK-9891:
---

yanghua commented on a change in pull request #6540: [FLINK-9891] Added hook to 
shutdown cluster if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540#discussion_r209443483
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
 ##
 @@ -249,13 +250,24 @@ protected void run(String[] args) throws Exception {
LOG.info("Could not properly shut down 
the client.", e);
}
} else {
+   final ClusterShutdownHook shutdownHook;
if (clusterId != null) {
client = 
clusterDescriptor.retrieve(clusterId);
+   shutdownHook = null;
} else {
// also in job mode we have to deploy a 
session cluster because the job
// might consist of multiple parts 
(e.g. when using collect)
final ClusterSpecification 
clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
+   // if not running in detached mode, add 
a shutdown hook to shut down cluster if client exits
+   // there's a race-condition here if cli 
is killed before shutdown hook is installed
+   if (!runOptions.getDetachedMode()) {
+   shutdownHook = new 
ClusterShutdownHook(client);
+   
Runtime.getRuntime().addShutdownHook(shutdownHook);
 
 Review comment:
   In flink-core there is a `ShutdownHookUtil` util class you can consider


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, 

[GitHub] yanghua commented on a change in pull request #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

2018-08-11 Thread GitBox
yanghua commented on a change in pull request #6540: [FLINK-9891] Added hook to 
shutdown cluster if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540#discussion_r209443343
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
 ##
 @@ -1219,4 +1231,21 @@ public static void 
setJobManagerAddressInConfig(Configuration config, InetSocket
return constructor.newInstance(params);
}
 
+   private static class ClusterShutdownHook extends Thread {
+   private final ClusterClient client;
+
+   ClusterShutdownHook(ClusterClient client) {
 
 Review comment:
   here use `@Nonnull` looks better to me


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on a change in pull request #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-11 Thread GitBox
yanghua commented on a change in pull request #6539: [FLINK-10123] Use 
ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539#discussion_r209443308
 
 

 ##
 File path: flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
 ##
 @@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.actor
 
 Review comment:
   @zentol here we should use this package.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577420#comment-16577420
 ] 

ASF GitHub Bot commented on FLINK-10123:


yanghua commented on a change in pull request #6539: [FLINK-10123] Use 
ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539#discussion_r209443308
 
 

 ##
 File path: flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
 ##
 @@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.actor
 
 Review comment:
   @zentol here we should use this package.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
> --
>
> Key: FLINK-10123
> URL: https://issues.apache.org/jira/browse/FLINK-10123
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Instead of using the {{DefaultThreadFactory}} in the 
> {{RestServerEndpoint}}/{{RestClient}} we should use the 
> {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} 
> per default as the uncaught exception handler. This should guard against 
> uncaught exceptions by simply terminating the JVM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10123) Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577419#comment-16577419
 ] 

ASF GitHub Bot commented on FLINK-10123:


zentol commented on a change in pull request #6539: [FLINK-10123] Use 
ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539#discussion_r209443286
 
 

 ##
 File path: flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
 ##
 @@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.actor
 
 Review comment:
   was this copied from akka?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
> --
>
> Key: FLINK-10123
> URL: https://issues.apache.org/jira/browse/FLINK-10123
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Instead of using the {{DefaultThreadFactory}} in the 
> {{RestServerEndpoint}}/{{RestClient}} we should use the 
> {{ExecutorThreadFactory}} because it uses the {{FatalExitExceptionHandler}} 
> per default as the uncaught exception handler. This should guard against 
> uncaught exceptions by simply terminating the JVM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6539: [FLINK-10123] Use ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client

2018-08-11 Thread GitBox
zentol commented on a change in pull request #6539: [FLINK-10123] Use 
ExecutorThreadFactory instead of DefaultThreadFactory in RestServer/Client
URL: https://github.com/apache/flink/pull/6539#discussion_r209443286
 
 

 ##
 File path: flink-runtime/src/main/scala/akka/actor/RobustActorSystem.scala
 ##
 @@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.actor
 
 Review comment:
   was this copied from akka?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577413#comment-16577413
 ] 

ASF GitHub Bot commented on FLINK-9941:
---

buptljy commented on issue #6411: [FLINK-9941][ScalaAPI] Flush in 
ScalaCsvOutputFormat before closing to ensure CI stability
URL: https://github.com/apache/flink/pull/6411#issuecomment-412318470
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rann Tao
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] buptljy commented on issue #6411: [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before closing to ensure CI stability

2018-08-11 Thread GitBox
buptljy commented on issue #6411: [FLINK-9941][ScalaAPI] Flush in 
ScalaCsvOutputFormat before closing to ensure CI stability
URL: https://github.com/apache/flink/pull/6411#issuecomment-412318470
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-08-11 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu closed FLINK-9693.
-
Resolution: Fixed

close this jira since [~srichter] have addressed remaining issues in other jiras

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.3, 1.6.1, 1.7.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577392#comment-16577392
 ] 

ASF GitHub Bot commented on FLINK-:
---

yanghua commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209441697
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,63 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testIsNumeric(): Unit = {
 
 Review comment:
   There is one here:
   
   ```
   testAllApis(
 "0.123a".isNumeric(),
 "'0.123a'.isNumeric",
 "ISNUMERIC('0.123a')",
 "0")
   ```
   
I don't add too much at the moment, but more coverage is always necessary. 
I want to wait until I can confirm that this function is necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ISNUMERIC supported in Table API/SQL
> 
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> ISNUMERIC function used to verify a expression is a valid numberic type.
> documentation : 
> https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6473: [FLINK-9999] [table] Add ISNUMERIC supported in Table API/SQL

2018-08-11 Thread GitBox
yanghua commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209441697
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,63 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testIsNumeric(): Unit = {
 
 Review comment:
   There is one here:
   
   ```
   testAllApis(
 "0.123a".isNumeric(),
 "'0.123a'.isNumeric",
 "ISNUMERIC('0.123a')",
 "0")
   ```
   
I don't add too much at the moment, but more coverage is always necessary. 
I want to wait until I can confirm that this function is necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577390#comment-16577390
 ] 

ASF GitHub Bot commented on FLINK-:
---

yanghua commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209441509
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 ##
 @@ -358,6 +358,29 @@ case class Rpad(text: Expression, len: Expression, pad: 
Expression)
   }
 }
 
+case class IsNumeric(child: Expression) extends UnaryExpression with 
InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = 
Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (child.resultType == STRING_TYPE_INFO) {
 
 Review comment:
   Here I think of it as a string type scalar function, the value represented 
by this string can be int/bigInt/double and so on.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ISNUMERIC supported in Table API/SQL
> 
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> ISNUMERIC function used to verify a expression is a valid numberic type.
> documentation : 
> https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6473: [FLINK-9999] [table] Add ISNUMERIC supported in Table API/SQL

2018-08-11 Thread GitBox
yanghua commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209441509
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 ##
 @@ -358,6 +358,29 @@ case class Rpad(text: Expression, len: Expression, pad: 
Expression)
   }
 }
 
+case class IsNumeric(child: Expression) extends UnaryExpression with 
InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = 
Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (child.resultType == STRING_TYPE_INFO) {
 
 Review comment:
   Here I think of it as a string type scalar function, the value represented 
by this string can be int/bigInt/double and so on.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors

2018-08-11 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577387#comment-16577387
 ] 

vinoyang commented on FLINK-7964:
-

hi [~yew1eb] Are you still dealing with this issue? What is the current 
progress? I am adding a kafka 2.0 connector, but upgrading from 0.11 to 2.0 has 
encountered a minor problem. I guess it may be because the upgraded version 
spans too much. If you are finished, I think we can upgrade the lower version 
first and then upgrade to the higher version. If you haven't made any 
substantial progress, I can also help upgrade the lower version of the kafka 
connector and then upgrade to 2.0.

> Add Apache Kafka 1.0/1.1 connectors
> ---
>
> Key: FLINK-7964
> URL: https://issues.apache.org/jira/browse/FLINK-7964
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Major
> Fix For: 1.7.0
>
>
> Kafka 1.0.0 is no mere bump of the version number. The Apache Kafka Project 
> Management Committee has packed a number of valuable enhancements into the 
> release. Here is a summary of a few of them:
> * Since its introduction in version 0.10, the Streams API has become hugely 
> popular among Kafka users, including the likes of Pinterest, Rabobank, 
> Zalando, and The New York Times. In 1.0, the the API continues to evolve at a 
> healthy pace. To begin with, the builder API has been improved (KIP-120). A 
> new API has been added to expose the state of active tasks at runtime 
> (KIP-130). The new cogroup API makes it much easier to deal with partitioned 
> aggregates with fewer StateStores and fewer moving parts in your code 
> (KIP-150). Debuggability gets easier with enhancements to the print() and 
> writeAsText() methods (KIP-160). And if that’s not enough, check out KIP-138 
> and KIP-161 too. For more on streams, check out the Apache Kafka Streams 
> documentation, including some helpful new tutorial videos.
> * Operating Kafka at scale requires that the system remain observable, and to 
> make that easier, we’ve made a number of improvements to metrics. These are 
> too many to summarize without becoming tedious, but Connect metrics have been 
> significantly improved (KIP-196), a litany of new health check metrics are 
> now exposed (KIP-188), and we now have a global topic and partition count 
> (KIP-168). Check out KIP-164 and KIP-187 for even more.
> * We now support Java 9, leading, among other things, to significantly faster 
> TLS and CRC32C implementations. Over-the-wire encryption will be faster now, 
> which will keep Kafka fast and compute costs low when encryption is enabled.
> * In keeping with the security theme, KIP-152 cleans up the error handling on 
> Simple Authentication Security Layer (SASL) authentication attempts. 
> Previously, some authentication error conditions were indistinguishable from 
> broker failures and were not logged in a clear way. This is cleaner now.
> * Kafka can now tolerate disk failures better. Historically, JBOD storage 
> configurations have not been recommended, but the architecture has 
> nevertheless been tempting: after all, why not rely on Kafka’s own 
> replication mechanism to protect against storage failure rather than using 
> RAID? With KIP-112, Kafka now handles disk failure more gracefully. A single 
> disk failure in a JBOD broker will not bring the entire broker down; rather, 
> the broker will continue serving any log files that remain on functioning 
> disks.
> * Since release 0.11.0, the idempotent producer (which is the producer used 
> in the presence of a transaction, which of course is the producer we use for 
> exactly-once processing) required max.in.flight.requests.per.connection to be 
> equal to one. As anyone who has written or tested a wire protocol can attest, 
> this put an upper bound on throughput. Thanks to KAFKA-5949, this can now be 
> as large as five, relaxing the throughput constraint quite a bit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-08-11 Thread Sebastian Klemke (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577359#comment-16577359
 ] 

Sebastian Klemke commented on FLINK-9891:
-

Thanks [~till.rohrmann] for the detailed explanation. Unfortunately, this bug 
limits our upgrade possibilities: Our batch jobs (DataSet API) are run by a 
service that orchestrates complex batch flows. This service needs to cancel 
running applications if SLA timeout kicks in. So far, we relied on Flink Cli in 
per-job, attached mode to manage Yarn application life-cycle also in the case 
of orchestrator cancelling a job.
[~suez1224]: Is it possible to merge our PR to 1.5 branch, 1.6 branch and 
master and release 1.5.3 + 1.6.1?

> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ 
> 11:51:27 GMT)
> ...
> {code}
> Issues:
>  # Flink job is running as a Flink session
>  # Ctrl+C 

[jira] [Commented] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577358#comment-16577358
 ] 

ASF GitHub Bot commented on FLINK-9891:
---

packet23 opened a new pull request #6540: [FLINK-9891] Added hook to shutdown 
cluster if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540
 
 
   ## What is the purpose of the change
   
   Change minimizes probability of a per-job yarn session cluster surviving 
after client exited. It restores the Flink 1.4 and below cli behaviour.
   
   
   ## Brief change log
   
 - when per-job yarn cluster is deployed, a shutdown hook installed
 - when shutdown hook is called, it terminates the deployed cluster
 - when cli terminates normally, shutdown hook is called by cli and 
deinstalled
 - otherwise, java runtime will call shutdown hook
   
   
   ## Verifying this change
   
   We verified the change manually by submitting a DataSet API application to 
Yarn and killing client before job terminated.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not documented
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  

[jira] [Updated] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-08-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9891:
--
Labels: pull-request-available  (was: )

> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ 
> 11:51:27 GMT)
> ...
> {code}
> Issues:
>  # Flink job is running as a Flink session
>  # Ctrl+C or 'stop' doesn't stop a job and YARN cluster
>  # Cancel job via Job Maanager web ui doesn't stop Flink cluster. To kill the 
> cluster we need to run: yarn application -kill 
> We also tried to run a flink job with 'mode: legacy' and we have the same 
> issues:
>  # Add property 'mode: legacy' to ./conf/flink-conf.yaml
>  # Execute the following command:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:

[GitHub] packet23 opened a new pull request #6540: [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode.

2018-08-11 Thread GitBox
packet23 opened a new pull request #6540: [FLINK-9891] Added hook to shutdown 
cluster if a session was created in per-job mode.
URL: https://github.com/apache/flink/pull/6540
 
 
   ## What is the purpose of the change
   
   Change minimizes probability of a per-job yarn session cluster surviving 
after client exited. It restores the Flink 1.4 and below cli behaviour.
   
   
   ## Brief change log
   
 - when per-job yarn cluster is deployed, a shutdown hook installed
 - when shutdown hook is called, it terminates the deployed cluster
 - when cli terminates normally, shutdown hook is called by cli and 
deinstalled
 - otherwise, java runtime will call shutdown hook
   
   
   ## Verifying this change
   
   We verified the change manually by submitting a DataSet API application to 
Yarn and killing client before job terminated.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not documented
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577231#comment-16577231
 ] 

ASF GitHub Bot commented on FLINK-:
---

walterddr commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209430968
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,63 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testIsNumeric(): Unit = {
 
 Review comment:
   Maybe add invalid argument cases to `ScalarFunctionsValidationTest` as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ISNUMERIC supported in Table API/SQL
> 
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> ISNUMERIC function used to verify a expression is a valid numberic type.
> documentation : 
> https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9999) Add ISNUMERIC supported in Table API/SQL

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577232#comment-16577232
 ] 

ASF GitHub Bot commented on FLINK-:
---

walterddr commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209430879
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 ##
 @@ -358,6 +358,29 @@ case class Rpad(text: Expression, len: Expression, pad: 
Expression)
   }
 }
 
+case class IsNumeric(child: Expression) extends UnaryExpression with 
InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = 
Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (child.resultType == STRING_TYPE_INFO) {
 
 Review comment:
   Wasn't sure if there is a SQL standard for `isNumeric` but some SQL servers 
such as MS and Transact do support `isNumeric(expression)` where expression can 
actually be a Numeric type (INT, BIGINT, DOUBLE, etc). Maybe @twalthr have more 
context ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ISNUMERIC supported in Table API/SQL
> 
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> ISNUMERIC function used to verify a expression is a valid numberic type.
> documentation : 
> https://docs.microsoft.com/en-us/sql/t-sql/functions/isnumeric-transact-sql?view=sql-server-2017



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #6473: [FLINK-9999] [table] Add ISNUMERIC supported in Table API/SQL

2018-08-11 Thread GitBox
walterddr commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209430968
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -450,6 +450,63 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testIsNumeric(): Unit = {
 
 Review comment:
   Maybe add invalid argument cases to `ScalarFunctionsValidationTest` as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #6473: [FLINK-9999] [table] Add ISNUMERIC supported in Table API/SQL

2018-08-11 Thread GitBox
walterddr commented on a change in pull request #6473: [FLINK-] [table] Add 
ISNUMERIC supported in Table API/SQL
URL: https://github.com/apache/flink/pull/6473#discussion_r209430879
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala
 ##
 @@ -358,6 +358,29 @@ case class Rpad(text: Expression, len: Expression, pad: 
Expression)
   }
 }
 
+case class IsNumeric(child: Expression) extends UnaryExpression with 
InputTypeSpec {
+
+  override private[flink] def expectedTypes: Seq[TypeInformation[_]] = 
Seq(STRING_TYPE_INFO)
+
+  override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (child.resultType == STRING_TYPE_INFO) {
 
 Review comment:
   Wasn't sure if there is a SQL standard for `isNumeric` but some SQL servers 
such as MS and Transact do support `isNumeric(expression)` where expression can 
actually be a Numeric type (INT, BIGINT, DOUBLE, etc). Maybe @twalthr have more 
context ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9664) FlinkML Quickstart Loading Data section example doesn't work as described

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577223#comment-16577223
 ] 

ASF GitHub Bot commented on FLINK-9664:
---

walterddr commented on a change in pull request #6425: [FLINK-9664][Doc] fixing 
documentation in ML quick start
URL: https://github.com/apache/flink/pull/6425#discussion_r209430609
 
 

 ##
 File path: docs/dev/libs/ml/quickstart.md
 ##
 @@ -129,6 +129,10 @@ and the [test set 
here](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/b
 This is an astroparticle binary classification dataset, used by Hsu et al. 
[[3]](#hsu) in their
 practical Support Vector Machine (SVM) guide. It contains 4 numerical 
features, and the class label.
 
+Before importing the traning and test dataset, Flink SVM only supports 
threshold binary values of 
+`+1.0` and `-1.0`. Thus a conversion is needed upon downloading the svmguide1 
dataset since it is 
+labelled using `1`s and `0`s.
+
 
 Review comment:
   thx for the detail explanation @azagrebin . Sorry for the previous 
confusion. I updated the document, please take another look when you have time 
:-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> FlinkML Quickstart Loading Data section example doesn't work as described
> -
>
> Key: FLINK-9664
> URL: https://issues.apache.org/jira/browse/FLINK-9664
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Machine Learning Library
>Affects Versions: 1.5.0
>Reporter: Mano Swerts
>Assignee: Rong Rong
>Priority: Major
>  Labels: documentation-update, machine_learning, ml, 
> pull-request-available
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The ML documentation example isn't complete: 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/libs/ml/quickstart.html#loading-data]
> The referred section loads data from an astroparticle binary classification 
> dataset to showcase SVM. The dataset uses 0 and 1 as labels, which doesn't 
> produce correct results. The SVM predictor expects -1 and 1 labels to 
> correctly predict the label. The documentation, however, doesn't mention 
> that. The example therefore doesn't work without a clue why.
> The documentation should be updated with an explicit mention to -1 and 1 
> labels and a mapping function that shows the conversion of the labels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on a change in pull request #6425: [FLINK-9664][Doc] fixing documentation in ML quick start

2018-08-11 Thread GitBox
walterddr commented on a change in pull request #6425: [FLINK-9664][Doc] fixing 
documentation in ML quick start
URL: https://github.com/apache/flink/pull/6425#discussion_r209430609
 
 

 ##
 File path: docs/dev/libs/ml/quickstart.md
 ##
 @@ -129,6 +129,10 @@ and the [test set 
here](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/b
 This is an astroparticle binary classification dataset, used by Hsu et al. 
[[3]](#hsu) in their
 practical Support Vector Machine (SVM) guide. It contains 4 numerical 
features, and the class label.
 
+Before importing the traning and test dataset, Flink SVM only supports 
threshold binary values of 
+`+1.0` and `-1.0`. Thus a conversion is needed upon downloading the svmguide1 
dataset since it is 
+labelled using `1`s and `0`s.
+
 
 Review comment:
   thx for the detail explanation @azagrebin . Sorry for the previous 
confusion. I updated the document, please take another look when you have time 
:-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577207#comment-16577207
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-412280321
 
 
   Have fixed the place you pointed, and add a test to cover the element in 
state and cache's change. Please take a look again, thx. @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process

2018-08-11 Thread GitBox
Aitozi commented on issue #6205: [FLINK-9642]Reduce the count to deal with 
state during a CEP process
URL: https://github.com/apache/flink/pull/6205#issuecomment-412280321
 
 
   Have fixed the place you pointed, and add a test to cover the element in 
state and cache's change. Please take a look again, thx. @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577090#comment-16577090
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209419821
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
+   return new SharedBufferAccessor<>(this);
+   }
 
-   if (previousNodeId != null) {
-   lockNode(previousNodeId);
+   public void advanceTime(long timestamp) throws Exception {
 
 Review comment:
   This need to be called in nfa package, I think it should be public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process

2018-08-11 Thread GitBox
Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209419821
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
+   return new SharedBufferAccessor<>(this);
+   }
 
-   if (previousNodeId != null) {
-   lockNode(previousNodeId);
+   public void advanceTime(long timestamp) throws Exception {
 
 Review comment:
   This need to be called in nfa package, I think it should be public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9642) Reduce the count to deal with state during a CEP process

2018-08-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577089#comment-16577089
 ] 

ASF GitHub Bot commented on FLINK-9642:
---

Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209419821
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
+   return new SharedBufferAccessor<>(this);
+   }
 
-   if (previousNodeId != null) {
-   lockNode(previousNodeId);
+   public void advanceTime(long timestamp) throws Exception {
 
 Review comment:
   This need to be called in nfa package, I think it should be public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce the count to deal with state during a CEP process 
> -
>
> Key: FLINK-9642
> URL: https://issues.apache.org/jira/browse/FLINK-9642
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.6.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>  Labels: pull-request-available
>
> With the rework of sharedBuffer Flink-9418, the lock & release operation is 
> deal with rocksdb state which is different from the previous version which 
> will read the state of sharedBuffer all to memory, i think we can add a cache 
> or variable in sharedbuffer to cache the LockAble Object to mark the ref 
> change in once process in NFA, this will reduce the count when the events 
> point to the same NodeId.. And flush the result to MapState at the end of 
> process. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process

2018-08-11 Thread GitBox
Aitozi commented on a change in pull request #6205: [FLINK-9642]Reduce the 
count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205#discussion_r209419821
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ##
 @@ -161,40 +100,30 @@ public void init(
eventsCount.putAll(maxIds);
}
 
-   /**
-* Stores given value (value + timestamp) under the given state. It 
assigns a preceding element
-* relation to the previous entry.
-*
-* @param stateName  name of the state that the event should be 
assigned to
-* @param eventIdunique id of event assigned by this 
SharedBuffer
-* @param previousNodeId id of previous entry (might be null if start 
of new run)
-* @param versionVersion of the previous relation
-* @return assigned id of this element
-* @throws Exception Thrown if the system cannot access the state.
-*/
-   public NodeId put(
-   final String stateName,
-   final EventId eventId,
-   @Nullable final NodeId previousNodeId,
-   final DeweyNumber version) throws Exception {
+   public SharedBufferAccessor getAccessor() {
+   return new SharedBufferAccessor<>(this);
+   }
 
-   if (previousNodeId != null) {
-   lockNode(previousNodeId);
+   public void advanceTime(long timestamp) throws Exception {
 
 Review comment:
   This need to be called in nfa package, I think it should be public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services