questions about yarn container(taskmanager)memory allocation

2019-05-20 Thread XiangWei Huang
Hi all,
Currently i am running my flink application in yarn session mode and
using below commnad :
*bin/yarn-session.sh -d -s 3 -jm 1024 -tm 4096*
when taskmanager complete to started,i found the container launching
command is :
* bin/java -Xms2765m -Xmx2765m -XX:MaxDirectMemorySize=1331m ..*
from flink source code,i'v learned how direct memory size and heap memory
size calculated :
container total memory: 4096M
network memory fraction: 0.1
cut off memory fraction: 0.25
networkMemory = 4096*0.1 = 409.6
cut off memory = (4096 - 409.6) * 0.25 = 921.6
*directMemorySize* = 409.6 + 921.6 = 1331.2
*heapsize *= 4096 - 1331.2 = 2764.8

Below is the environment i'm using to run yarn and flink:
*  jdk version: 1.8*
* flink version: 1.6.1*
*OS: centos7*

there are two questions about memory allocation i want to ask:
1. Since the jdk version i an using is 1.8,is it necessary to consider
the *metaspace
memory *into calculation.According to the current way to calculate memory
size(without metaspace memory) may cause
container running beyond physical memory limit.
2. Is the native memory that rocksdb using part of direct memory(limit by
jvm parameter  *MaxDirectMemorySize*),if not how do i control the size it
used.


How to Join a dimension table in flink sql

2018-01-15 Thread XiangWei Huang
Hi all,
   Is it possible to  join records read from a kafka stream with one(or
more) dimension tables which are saved as mysql table using flink streaming
sql.


Re: ExecutionGraph not serializable

2017-11-07 Thread XiangWei Huang
hi Till,

   Sorry,I've made a mistake,i used
*StandaloneClusterClient*#*getJobManagerGateway
*to get  *ActorGateway *to communicate with *JobManager *instead of using
*JobMasterGateway*.
Below is the code i executed for getting ExecuteGraph of a Job.


val flinkConfig = new Configuration()
val flinkCli = new StandaloneClusterClient(flinkConfig)
*val jobManagerGateWay = flinkCli.getJobManagerGateway*
val jobs = jobManagerGateWay.ask(RequestRunningJobsStatus,new
FiniteDuration(10,TimeUnit.SECONDS)).asInstanceOf[Future[RunningJobsStatus]]
val jobsStatus = Await.result(jobs,new
FiniteDuration(10,TimeUnit.SECONDS)).getStatusMessages().asScala.head
val jobId = jobsStatus.getJobId
val timeOut = new FiniteDuration(10,TimeUnit.SECONDS)
*val future = jobManagerGateWay.ask(RequestJob(jobId),timeOut)*
val result = Await.result(future,timeOut)

JobManager threw NotSerializableException  when i executed this code. So i
wonder how is this happened and is there another way to get a job's
ExecutionGraph programmatically.

Best,XiangWei

2017-11-07 17:16 GMT+08:00 Till Rohrmann :

> Hi XiangWei,
>
> how do you use the JobMasterGateway with the actor message RequestJob?
> The JobMasterGateway is a Java interface and does not represent an
> ActorCell to which you can send actor messages. Instead you should call
> JobMasterGateway#requestArchivedExecutionGraph.
>
> Cheers,
> Till
> ​
>
> On Tue, Nov 7, 2017 at 9:58 AM, Fabian Hueske  wrote:
>
>> Hi XiangWei,
>>
>> I don't think this is a public interface, but Till (in CC) might know
>> better.
>>
>> Best,
>> Fabian
>>
>> 2017-11-06 3:27 GMT+01:00 XiangWei Huang :
>>
>>> Hi Flink users,
>>> Flink Jobmanager throw a NotSerializableException when i used
>>> JobMasterGateway to get ExecutionGraph of a specific job with
>>> message *RequestJob(jobID). *Blow is the detail of Exception:
>>>
>>>
>>> [ERROR] [akka.remote.EndpointWriter] - Transient association error 
>>> (association remains live)java.io.NotSerializableException: 
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>> at 
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>> at 
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>> at 
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>> at 
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>> at 
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>> at 
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>>> at 
>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>> at 
>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
>>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
>>> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>> at 
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> So,is it a bug or the way to get job’s executionGraph is invalid.
>>>
>>>
>>> Best,XiangWei
>>>
>>>
>>>
>>
>


ExecutionGraph not serializable

2017-11-05 Thread XiangWei Huang
Hi Flink users,
Flink Jobmanager throw a NotSerializableException when i used 
JobMasterGateway to get ExecutionGraph of a specific job with 
message RequestJob(jobID). Blow is the detail of Exception:


[ERROR] [akka.remote.EndpointWriter] - Transient association error (association 
remains live)
java.io.NotSerializableException: 
org.apache.flink.runtime.executiongraph.ExecutionGraph
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at 
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
at 
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:875)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:874)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:769)
at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:744)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:437)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So,is it a bug or the way to get job’s executionGraph is invalid.

Best,XiangWei



Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code

2017-09-27 Thread XiangWei Huang
Hi Till,

I’ve found that a StandaloneMiniCluster doesn’t startup web fronted 
when it is running.so,how can i cancel a running job on it with restful method.

Cheers,
Till

> 在 2017年9月20日,15:43,Till Rohrmann  写道:
> 
> Hi XiangWei,
> 
> programmatically there is no nice tooling yet to cancel jobs on a dedicated 
> cluster. What you can do is to use Flink's REST API to issue a cancel command 
> [1]. You have to send a GET request to the target URL `/jobs/:jobid/cancel`. 
> In the future we will improve the programmatic job control which will allow 
> you to do these kind of things more easily.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation>
> 
> Cheers,
> Till
> 
> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang  <mailto:xw.huang...@gmail.com>> wrote:
> Hi Till,
>
>  Thanks for your answer,it worked when i use StandaloneMiniCluster,but 
> another problem is that i can’t find a way to cancel
> a running Flink job without shutting down the cluster,for 
> LocalFlinkMiniCluster i can do  it with below code :
> 
>for (job <- cluster.getCurrentlyRunningJobsJava()) {
>   cluster.stopJob(job)
>}
> 
>Is it possible to cancel a running Flink job without shutting down a 
> StandaloneMiniCluster ?
> 
> Best Regards,
> XiangWei
> 
> 
> 
>> 在 2017年9月14日,16:58,Till Rohrmann > <mailto:trohrm...@apache.org>> 写道:
>> 
>> Hi XiangWei,
>> 
>> the problem is that the LocalFlinkMiniCluster can no longer be used in 
>> combination with a RemoteExecutionEnvironment. The reason is that the 
>> LocalFlinkMiniCluster uses now an internal leader election service and 
>> assigns leader ids to its components. Since this is an internal service it 
>> is not possible to retrieve this information like it is the case with the 
>> ZooKeeper based leader election services.
>> 
>> Long story short, the Flink Scala shell currently does not work with a 
>> LocalFlinkMiniCluster and would have to be fixed to work properly together 
>> with a local execution environment. Until then, I recommend starting a local 
>> standalone cluster and let the code run there.
>> 
>> Cheers,
>> Till
>> 
>> 
>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang > <mailto:xw.huang...@gmail.com>> wrote:
>> dear all,
>> 
>> Below is the code i execute:
>> 
>> import java.io <http://java.io/>._
>> import java.net <http://java.net/>.{URL, URLClassLoader}
>> import java.nio.charset.Charset
>> import java.util.Collections
>> import java.util.concurrent.at 
>> <http://java.util.concurrent.at/>omic.AtomicBoolean
>> 
>> import com.netease.atom.common.util.logging.Logging
>> import com.netease.atom.interpreter.Code.Code
>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, 
>> InterpreterUtils}
>> import io.netty.buffer._
>> import org.apache.flink.api.sc 
>> <http://org.apache.flink.api.sc/>ala.FlinkILoop
>> import org.apache.flink.client.CliFrontend
>> import org.apache.flink.client.cli.CliFrontendParser
>> import org.apache.flink.client.program.ClusterClient
>> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, 
>> ConfigConstants, GlobalConfiguration}
>> import org.apache.flink.runtime.akka.AkkaUtils
>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, 
>> LocalFlinkMiniCluster}
>> 
>> import scala.Console
>> import scala.beans.BeanProperty
>> import scala.collection.JavaConversions._
>> import scala.collection.mutable
>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>> import scala.runtime.AbstractFunction0
>> import scala.tools.nsc.Settings
>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>> 
>> class FlinkInterpreter extends Interpreter {
>>   private var bufferedReader: Option[BufferedReader] = None
>>   private var jprintWriter: JPrintWriter = _
>>   private val config = new Configuration;
>>   private var cluster: LocalFlinkMiniCluster = _
>>   @BeanProperty var imain: IMain = _
>>   @BeanProperty var flinkILoop: FlinkILoop = _
>>   private var out: ByteBufOutputStream = null
>>   private var outBuf: ByteBuf = null
>>   private var in: ByteBufInputStream = _
>>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>> 
>>   override def isOpen:

Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code

2017-09-19 Thread XiangWei Huang
Hi Till,
   
 Thanks for your answer,it worked when i use StandaloneMiniCluster,but 
another problem is that i can’t find a way to cancel
a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster 
i can do  it with below code :

   for (job <- cluster.getCurrentlyRunningJobsJava()) {
  cluster.stopJob(job)
   }

   Is it possible to cancel a running Flink job without shutting down a 
StandaloneMiniCluster ?

Best Regards,
XiangWei



> 在 2017年9月14日,16:58,Till Rohrmann  写道:
> 
> Hi XiangWei,
> 
> the problem is that the LocalFlinkMiniCluster can no longer be used in 
> combination with a RemoteExecutionEnvironment. The reason is that the 
> LocalFlinkMiniCluster uses now an internal leader election service and 
> assigns leader ids to its components. Since this is an internal service it is 
> not possible to retrieve this information like it is the case with the 
> ZooKeeper based leader election services.
> 
> Long story short, the Flink Scala shell currently does not work with a 
> LocalFlinkMiniCluster and would have to be fixed to work properly together 
> with a local execution environment. Until then, I recommend starting a local 
> standalone cluster and let the code run there.
> 
> Cheers,
> Till
> 
> 
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang  <mailto:xw.huang...@gmail.com>> wrote:
> dear all,
> 
> Below is the code i execute:
> 
> import java.io <http://java.io/>._
> import java.net <http://java.net/>.{URL, URLClassLoader}
> import java.nio.charset.Charset
> import java.util.Collections
> import java.util.concurrent.atomic.AtomicBoolean
> 
> import com.netease.atom.common.util.logging.Logging
> import com.netease.atom.interpreter.Code.Code
> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, 
> InterpreterUtils}
> import io.netty.buffer._
> import org.apache.flink.api.scala.FlinkILoop
> import org.apache.flink.client.CliFrontend
> import org.apache.flink.client.cli.CliFrontendParser
> import org.apache.flink.client.program.ClusterClient
> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, 
> ConfigConstants, GlobalConfiguration}
> import org.apache.flink.runtime.akka.AkkaUtils
> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, 
> LocalFlinkMiniCluster}
> 
> import scala.Console
> import scala.beans.BeanProperty
> import scala.collection.JavaConversions._
> import scala.collection.mutable
> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
> import scala.runtime.AbstractFunction0
> import scala.tools.nsc.Settings
> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
> 
> class FlinkInterpreter extends Interpreter {
>   private var bufferedReader: Option[BufferedReader] = None
>   private var jprintWriter: JPrintWriter = _
>   private val config = new Configuration;
>   private var cluster: LocalFlinkMiniCluster = _
>   @BeanProperty var imain: IMain = _
>   @BeanProperty var flinkILoop: FlinkILoop = _
>   private var out: ByteBufOutputStream = null
>   private var outBuf: ByteBuf = null
>   private var in: ByteBufInputStream = _
>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
> 
>   override def isOpen: Boolean = {
> isRunning.get()
>   }
> 
>   def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
> config.toMap.toMap.foreach(println)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> val localCluster = new LocalFlinkMiniCluster(config, false)
> localCluster.start(true)
> val port = 
> AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
> println(s"Starting local Flink cluster (host: localhost,port: 
> ${localCluster.getLeaderRPCPort}).\n")
> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>   }
> 
>  
>   /**
>* Start flink cluster and create interpreter
>*/
>   override def open: Unit = {
> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
> out = new ByteBufOutputStream(outBuf)
> in = new ByteBufInputStream(outBuf)
> //val (host, port, yarnCluster) = 
> deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
> val (host, port, localCluster) = startLocalMiniCluster()
> this.cluster = localCluster
> val

got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code

2017-09-12 Thread XiangWei Huang
dear all,

Below is the code i execute:

import java.io._
import java.net.{URL, URLClassLoader}
import java.nio.charset.Charset
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean

import com.netease.atom.common.util.logging.Logging
import com.netease.atom.interpreter.Code.Code
import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, 
InterpreterUtils}
import io.netty.buffer._
import org.apache.flink.api.scala.FlinkILoop
import org.apache.flink.client.CliFrontend
import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{QueryableStateOptions, Configuration, 
ConfigConstants, GlobalConfiguration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, 
LocalFlinkMiniCluster}

import scala.Console
import scala.beans.BeanProperty
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.runtime.AbstractFunction0
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}

class FlinkInterpreter extends Interpreter {
  private var bufferedReader: Option[BufferedReader] = None
  private var jprintWriter: JPrintWriter = _
  private val config = new Configuration;
  private var cluster: LocalFlinkMiniCluster = _
  @BeanProperty var imain: IMain = _
  @BeanProperty var flinkILoop: FlinkILoop = _
  private var out: ByteBufOutputStream = null
  private var outBuf: ByteBuf = null
  private var in: ByteBufInputStream = _
  private var isRunning: AtomicBoolean = new AtomicBoolean(false)

  override def isOpen: Boolean = {
isRunning.get()
  }

  def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
config.toMap.toMap.foreach(println)
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
val localCluster = new LocalFlinkMiniCluster(config, false)
localCluster.start(true)
val port = 
AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
println(s"Starting local Flink cluster (host: localhost,port: 
${localCluster.getLeaderRPCPort}).\n")
("localhost", localCluster.getLeaderRPCPort, localCluster)
  }

 
  /**
   * Start flink cluster and create interpreter
   */
  override def open: Unit = {
outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
out = new ByteBufOutputStream(outBuf)
in = new ByteBufInputStream(outBuf)
//val (host, port, yarnCluster) = 
deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
val (host, port, localCluster) = startLocalMiniCluster()
this.cluster = localCluster
val conf = cluster.configuration
println(s"Connecting to Flink cluster (host:$host,port:$port)...")
flinkILoop = new FlinkILoop(host, port, conf, None)
val settings = new Settings()
settings.usejavacp.value = true
settings.Yreplsync.value = true
flinkILoop.settings_$eq(settings)
flinkILoop.createInterpreter()
imain = flinkILoop.intp
FlinkInterpreter.ourClassloader = imain.classLoader
val benv = flinkILoop.scalaBenv
val senv = flinkILoop.scalaSenv
benv.getConfig.disableSysoutLogging()
senv.getConfig.disableSysoutLogging()
// import libraries
imain.interpret("import scala.tools.nsc.io._")
//imain.interpret("import Properties.userHome")
imain.interpret("import scala.compat.Platform.EOL")
imain.interpret("import org.apache.flink.api.scala._")
imain.interpret("import org.apache.flink.api.common.functions._")
isRunning.set(true)
  }

  override def interpret(line: String): InterpreterResult = {
if (line == null || line.trim.length == 0) {
  return new InterpreterResult(Code.SUCCESS)
}
interpret(line.split("\n"))
  }

  /**
   * Interprete code
   * @param lines
   * @return
   */
  def interpret(lines: Array[String]): InterpreterResult = {
val imain: IMain = getImain
val linesToRun: Array[String] = new Array[String](lines.length + 1)
for (i <- 0 until lines.length) {
  linesToRun(i) = lines(i)
}
linesToRun(lines.length) = "print(\"\")"
System.setOut(new PrintStream(out))
out.buffer().clear()
var r: Code = null
var incomplete: String = ""
var inComment: Boolean = false
for (l <- 0 until linesToRun.length) {
  val s: String = linesToRun(l)
  var continuation: Boolean = false
  if (l + 1 < linesToRun.length) {
val nextLine: String = linesToRun(l + 1).trim
if (nextLine.isEmpty ||
nextLine.st

Re: a lot of connections in state "CLOSE_WAIT"

2017-08-08 Thread XiangWei Huang
It seems so,thanks for your reply Chesnay.

> 在 2017年8月8日,22:23,Chesnay Schepler [via Apache Flink User Mailing List 
> archive.]  写道:
> 
> FLINK-7368 may be the reason for this behavior.
> 
> On 31.07.2017 03:54, XiangWei Huang wrote:
>> 1. yes and yes.
>> 2. Yes,it was shown correctly.
>> 3.I wasn’t modify this setting.
>> 
>>> 在 2017年7月26日,18:06,Chesnay Schepler [via Apache Flink User Mailing List 
>>> archive.] <[hidden email] 
>>> > 写道:
>>> 
>>> So this only happens when you select a metric? Without a selected metric 
>>> everything works fine?
>>> 
>>> Are the metrics you selected shown correctly?
>>> 
>>> Did you modify the "jobmanager.web.refresh-interval" setting? (maybe check 
>>> the flink-conf-yaml for the current setting)
>>> 
>>> On 26.07.2017 04:57, XiangWei Huang wrote:
>>>> hi,
>>>> 
>>>> The browser i am using is Google Chrome  with version 59.0.3071.115 and 
>>>> the issue persists when i tried Firefox.
>>>> 
>>>> Regards,
>>>> XiangWei
>>>>> 在 2017年7月25日,17:48,Chesnay Schepler >>>> href="x-msg://6/user/SendEmail.jtp?type=node&node=14463&i=0 
>>>>> " 
>>>>> target="_top" rel="nofollow" link="external" class="">[hidden email] 写道:
>>>>> 
>>>>> Hello,
>>>>> 
>>>>> Could you tell us which browser you are using, including the version?
>>>>> (and maybe try out if the issue persists with a different one)
>>>>> 
>>>>> Regards,
>>>>> Chesnay
>>>>> 
>>>>> On 25.07.2017 05:20, XiangWei Huang wrote:
>>>>>> hi,
>>>>>> 
>>>>>> Sorry for replying so late.
>>>>>> I have met this issue again and the list is constantly keep growing even 
>>>>>> if
>>>>>> i close the page ,until the website is been unavailable.
>>>>>> 
>>>>>> This issue appeared each time i add  metrics for a job from web ui.
>>>>>> 
>>>>>> by the way ,the version of Flink is 1.3.1
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Regards,
>>>>>> XiangWei
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> View this message in context: 
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14422.html
>>>>>>  
>>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14422.html>
>>>>>> Sent from the Apache Flink User Mailing List archive. mailing list 
>>>>>> archive at Nabble.com <http://nabble.com/>.
>>>>>> 
>>> 
>>> 
>>> 
>>> If you reply to this email, your message will be added to the discussion 
>>> below:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14463.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14463.html>
>>> To unsubscribe from a lot of connections in state "CLOSE_WAIT", click here 
>>> .
>>> NAML 
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>> 
>> View this message in context: Re: a lot of connections in state "CLOSE_WAIT" 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14539.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at 
>> Nabble.com <http://nabble.com/>.
> 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14740.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14740.html>
> To unsubscribe from a lot of connections in state "CLOSE_WAIT", click here 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=14046&code=eHcuaHVhbmcuaHpAZ21haWwuY29tfDE0MDQ2fDEyODY0MDcwODE=>.
> NAML 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>


Re: a lot of connections in state "CLOSE_WAIT"

2017-07-30 Thread XiangWei Huang
1. yes and yes.
2. Yes,it was shown correctly.
3.I wasn’t modify this setting.

> 在 2017年7月26日,18:06,Chesnay Schepler [via Apache Flink User Mailing List 
> archive.]  写道:
> 
> So this only happens when you select a metric? Without a selected metric 
> everything works fine?
> 
> Are the metrics you selected shown correctly?
> 
> Did you modify the "jobmanager.web.refresh-interval" setting? (maybe check 
> the flink-conf-yaml for the current setting)
> 
> On 26.07.2017 04:57, XiangWei Huang wrote:
>> hi,
>> 
>> The browser i am using is Google Chrome  with version 59.0.3071.115 and the 
>> issue persists when i tried Firefox.
>> 
>> Regards,
>> XiangWei
>>> 在 2017年7月25日,17:48,Chesnay Schepler [hidden email] 
>>>  写道:
>>> 
>>> Hello,
>>> 
>>> Could you tell us which browser you are using, including the version?
>>> (and maybe try out if the issue persists with a different one)
>>> 
>>> Regards,
>>> Chesnay
>>> 
>>> On 25.07.2017 05:20, XiangWei Huang wrote:
>>>> hi,
>>>> 
>>>> Sorry for replying so late.
>>>> I have met this issue again and the list is constantly keep growing even if
>>>> i close the page ,until the website is been unavailable.
>>>> 
>>>> This issue appeared each time i add  metrics for a job from web ui.
>>>> 
>>>> by the way ,the version of Flink is 1.3.1
>>>> 
>>>> 
>>>> 
>>>> Regards,
>>>> XiangWei
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> View this message in context: 
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14422.html
>>>>  
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14422.html>
>>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>>> at Nabble.com.
>>>> 
> 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14463.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14463.html>
> To unsubscribe from a lot of connections in state "CLOSE_WAIT", click here 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=14046&code=eHcuaHVhbmcuaHpAZ21haWwuY29tfDE0MDQ2fDEyODY0MDcwODE=>.
> NAML 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14539.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: a lot of connections in state "CLOSE_WAIT"

2017-07-25 Thread XiangWei Huang
hi,

The browser i am using is Google Chrome  with version 59.0.3071.115 and the 
issue persists when i tried Firefox.

Regards,
XiangWei
> 在 2017年7月25日,17:48,Chesnay Schepler  写道:
> 
> Hello,
> 
> Could you tell us which browser you are using, including the version?
> (and maybe try out if the issue persists with a different one)
> 
> Regards,
> Chesnay
> 
> On 25.07.2017 05:20, XiangWei Huang wrote:
>> hi,
>> 
>> Sorry for replying so late.
>> I have met this issue again and the list is constantly keep growing even if
>> i close the page ,until the website is been unavailable.
>> 
>> This issue appeared each time i add  metrics for a job from web ui.
>> 
>> by the way ,the version of Flink is 1.3.1
>> 
>> 
>> 
>> Regards,
>> XiangWei
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14422.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
>> 
> 



Re: a lot of connections in state "CLOSE_WAIT"

2017-07-24 Thread XiangWei Huang
hi, 

Sorry for replying so late. 
I have met this issue again and the list is constantly keep growing even if
i close the page ,until the website is been unavailable. 

This issue appeared each time i add  metrics for a job from web ui. 

by the way ,the version of Flink is 1.3.1 



Regards, 
XiangWei 




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/a-lot-of-connections-in-state-CLOSE-WAIT-tp14046p14422.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink will delete all jars uploaded when restart jobmanager

2017-06-13 Thread XiangWei Huang
Hi,
When restart flink jobmanager jars which uploaded by user from web ui will be 
deleted .
Is there anyway to avoid this.

Can't kill a job which contains a while loop in the main method before it be submitted

2017-06-13 Thread XiangWei Huang
Hi,
I met a problem when use jedis in flink.When using jedis to get a connection to 
redis  if the redis server is not available then jedis will keep trying and 
never end,the problem is that the job’s status is not set to RUNNING by flink, 
that means it can’t be killed by flink.The only way to break this look is to 
restart jobmanager.
Is there another way to solve this without restart jobmanager.