Re: submit job failed on Yarn HA

2019-02-27 Thread 孙森
Hi Gary:

   I have tried the 1.5.6 version, it shows the same error.

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result.
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:798)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:289)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1035)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:371)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:795)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException


Best!
Sen

> 在 2019年2月27日,下午9:30,Gary Yao  写道:
> 
> Hi,
> 
> How did you determine "jmhost" and "port"? Actually you do not need to specify
> these manually. If the client is using the same configuration as your cluster,
> the client will look up the leading JM from ZooKeeper.
> 
> If you have already tried omitting the "-m" parameter, you can check in the
> client logs which host is used for the job submission [1]. Note that you need
> to enable logging on DEBUG level.
> 
> The root cause in your stacktrace is a TimeoutException. I would debug this by
> checking if you can 

Re: flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-27 Thread Yaoting Gong
@thinktothings

不知道是否我理解正确,我觉得你可以对flink的一些计算流程不是很清楚。 SumAggregator内的reduce
方法就可以计算出需要的“sum”结果。

你一直问什么时候调用sum,是指代码中的“sum("count")”吗?这个在构建steamgraph的时候已经调用了,目的就是获取内部返回的SumAggregator对象。

有个文章可以了解下。看下第一次即可:https://www.jianshu.com/p/13070729289c

如果我理解的不对,请忽略

On Thu, Feb 28, 2019 at 2:57 PM  wrote:

> --
>
> ).本地环境: scala WordCount ,程序在附件中 SocketWindowWordCountLocal.scala
> ).输入数据:
>a b a
> ).设置的 timeWindow(Time.seconds(20))
> ).[问题]想调试Flink源码中具体在哪一步进行sum操作
> -
>
> 调试:
> ).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值,
> 函数中会一条一条数据发送(a,1),(b,1),(a,1)
> ).调用 StreamSink.processElement 函数打印输出结果
> ).没明白地方,是在调用StreamSink.processElement之前,在哪个地方调用了sum,对相同key进行了聚合操作
>
>
> On Thursday, 28 February 2019, 2:47:39 pm GMT+8, Yaoting Gong <
> fall.for.you@gmail.com> wrote:
>
>
> 你好。
> 我不知道你的是什么项目的代码。我从flink 官方的样例代码 SocketWindowWordCount.scala找到。
> 从sum跟进去,最终能找到一个SumAggregator。
>
> On Thu, Feb 28, 2019 at 2:34 PM  wrote:
>
> >  Flink wordCount
> >
> 本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-package
> > com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
> >
> > import org.apache.flink.configuration.Configuration
> > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> > import org.apache.flink.streaming.api.windowing.time.Time
> >
> > /**
> >  * nc -lk 1234  输入数据
> >  */
> > object SocketWindowWordCountLocal {
> >
> >  def main(args: Array[String]): Unit = {
> >
> >
> >val port = 1234
> >// get the execution environment
> >// val env: StreamExecutionEnvironment =
> > StreamExecutionEnvironment.getExecutionEnvironment
> >
> >
> >val configuration : Configuration = new Configuration()
> >val timeout = "10 s"
> >val timeoutHeartbeatPause = "100 s"
> >configuration.setString("akka.ask.timeout",timeout)
> >configuration.setString("akka.lookup.timeout",timeout)
> >configuration.setString("akka.tcp.timeout",timeout)
> >configuration.setString("akka.transport.heartbeat.interval",timeout)
> >
> >
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
> >configuration.setString("akka.watch.heartbeat.pause",timeout)
> >configuration.setInteger("heartbeat.interval",1000)
> >configuration.setInteger("heartbeat.timeout",5000)
> >val env:StreamExecutionEnvironment =
> > StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> >
> >
> >
> >
> >
> >// get input data by connecting to the socket
> >val dataStream = env.socketTextStream("localhost", port, '\n')
> >
> >
> >
> >import org.apache.flink.streaming.api.scala._
> >val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w =>
> > WordWithCount(w,1))
> >  .keyBy("word")
> >  /**
> >* 每20秒刷新一次,相当于重新开始计数,
> >* 好处,不需要一直拿所有的数据统计
> >* 只需要在指定时间间隔内的增量数据,减少了数据规模
> >*/
> >  .timeWindow(Time.seconds(20))
> >  //.countWindow(3)
> >  //.countWindow(3,1)
> >  //.countWindowAll(3)
> >
> >
> >  .sum("count" )
> >
> >textResult.print().setParallelism(1)
> >
> >
> >
> >if(args == null || args.size ==0){
> >  env.execute("默认作业")
> >
> >  //执行计划
> >  //println(env.getExecutionPlan)
> >  //StreamGraph
> >  //println(env.getStreamGraph.getStreamingPlanAsJSON)
> >
> >
> >
> >  //JsonPlanGenerator.generatePlan(jobGraph)
> >
> >}else{
> >  env.execute(args(0))
> >}
> >
> >println("结束")
> >
> >  }
> >
> >
> >  // Data type for words with count
> >  case class WordWithCount(word: String, count: Long)
> >
> > }
> >
> >
> >On Thursday, 28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong <
> > fall.for.you@gmail.com> wrote:
> >
> >  @Yuan Yifan
> >
> > *不能贴图的。*
> >
> > On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan  wrote:
> >
> > >
> > > 你说的应该是这里的代码:
> > >
> > >
> > >
> >
> http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
> > >
> > > 其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。
> > >
> > > 本质上,sum是执行了一个Sum类型的Aggregate:
> > > 其AggregateFunction是:
> > >
> > >
> >
> org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
> > > org.apache.flink.api.common.typeinfo.TypeInformation,
> > > org.apache.flink.api.common.ExecutionConfig)
> > >
> > >
> > > 其中实现了reduce方法:
> > >
> > > 所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。
> > >
> > >
> > >
> > > 在 2019-02-28 13:04:59," "  写道:
> > > >请问:  flink wordcount中  sum是在什么时候,哪个地方调用的?
> > >
> > >
> > >
> > >
> > >
> >
>


Flink performance drops when async checkpoint is slow

2019-02-27 Thread Paul Lam
Hi,

I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some 
transformations and aggregates, and write to two Kafka topics respectively. 
Meanwhile, there’s a custom source that pulls configurations for the 
transformations periodically. The generic job graph is as below.



The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is 
unstable, and sometimes HDFS client reports slow read and slow 
waitForAckedSeqno during checkpoints. When that happens, the Flink job consume 
rate drops significantly, and some taskmanager’ cpu usage drops from about 140% 
to 1%, all the task threads on that taskmanager are blocked. This situation 
lasts from seconds to a minute. We started a parallel job with everything the 
same except checkpointing disabled, and it runs very steady.
But I think as the checkpointing is async, it should not affect the task 
threads.

There are some additional information that we observed:

-  When the performance drops, jstack shows that Kafka source and the task 
right after it is blocked at requesting memory buffer (with back pressure close 
to 1), and the last task is blocked at  `SingleInputGate.getNextBufferOrEvent`. 
- The dashboard shows that the buffer during alignment is less than 10 MB, even 
when back pressure is high.

We’ve been struggling with this problem for weeks, and any help is appreciated. 
Thanks a lot!

Best,
Paul Lam



Re: flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-27 Thread  
 --
).本地环境: scala WordCount ,程序在附件中 SocketWindowWordCountLocal.scala).输入数据:   a b 
a).设置的 timeWindow(Time.seconds(20))   
).[问题]想调试Flink源码中具体在哪一步进行sum操作-
调试:).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值, 
函数中会一条一条数据发送(a,1),(b,1),(a,1)).调用 StreamSink.processElement 
函数打印输出结果).没明白地方,是在调用StreamSink.processElement之前,在哪个地方调用了sum,对相同key进行了聚合操作

On Thursday, 28 February 2019, 2:47:39 pm GMT+8, Yaoting Gong 
 wrote:  
 
 你好。
我不知道你的是什么项目的代码。我从flink 官方的样例代码 SocketWindowWordCount.scala找到。
从sum跟进去,最终能找到一个SumAggregator。

On Thu, Feb 28, 2019 at 2:34 PM  wrote:

>  Flink wordCount
> 本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-package
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
>
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
>
> /**
>  * nc -lk 1234  输入数据
>  */
> object SocketWindowWordCountLocal {
>
>  def main(args: Array[String]): Unit = {
>
>
>    val port = 1234
>    // get the execution environment
>    // val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>
>
>    val configuration : Configuration = new Configuration()
>    val timeout = "10 s"
>    val timeoutHeartbeatPause = "100 s"
>    configuration.setString("akka.ask.timeout",timeout)
>    configuration.setString("akka.lookup.timeout",timeout)
>    configuration.setString("akka.tcp.timeout",timeout)
>    configuration.setString("akka.transport.heartbeat.interval",timeout)
>
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>    configuration.setString("akka.watch.heartbeat.pause",timeout)
>    configuration.setInteger("heartbeat.interval",1000)
>    configuration.setInteger("heartbeat.timeout",5000)
>    val env:StreamExecutionEnvironment =
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
>
>
>
>
>
>    // get input data by connecting to the socket
>    val dataStream = env.socketTextStream("localhost", port, '\n')
>
>
>
>    import org.apache.flink.streaming.api.scala._
>    val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w =>
> WordWithCount(w,1))
>      .keyBy("word")
>      /**
>        * 每20秒刷新一次,相当于重新开始计数,
>        * 好处,不需要一直拿所有的数据统计
>        * 只需要在指定时间间隔内的增量数据,减少了数据规模
>        */
>      .timeWindow(Time.seconds(20))
>      //.countWindow(3)
>      //.countWindow(3,1)
>      //.countWindowAll(3)
>
>
>      .sum("count" )
>
>    textResult.print().setParallelism(1)
>
>
>
>    if(args == null || args.size ==0){
>      env.execute("默认作业")
>
>      //执行计划
>      //println(env.getExecutionPlan)
>      //StreamGraph
>      //println(env.getStreamGraph.getStreamingPlanAsJSON)
>
>
>
>      //JsonPlanGenerator.generatePlan(jobGraph)
>
>    }else{
>      env.execute(args(0))
>    }
>
>    println("结束")
>
>  }
>
>
>  // Data type for words with count
>  case class WordWithCount(word: String, count: Long)
>
> }
>
>
>    On Thursday, 28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong <
> fall.for.you@gmail.com> wrote:
>
>  @Yuan Yifan
>
> *不能贴图的。*
>
> On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan  wrote:
>
> >
> > 你说的应该是这里的代码:
> >
> >
> >
> http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
> >
> > 其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。
> >
> > 本质上,sum是执行了一个Sum类型的Aggregate:
> > 其AggregateFunction是:
> >
> >
> org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
> > org.apache.flink.api.common.typeinfo.TypeInformation,
> > org.apache.flink.api.common.ExecutionConfig)
> >
> >
> > 其中实现了reduce方法:
> >
> > 所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。
> >
> >
> >
> > 在 2019-02-28 13:04:59," "  写道:
> > >请问:  flink wordcount中      sum是在什么时候,哪个地方调用的?
> >
> >
> >
> >
> >
>
  

Re: Split Stream on a Split Stream

2019-02-27 Thread Taher Koitawala
No particular reason for not using the process function, just wanted to
clarify if that was the correct way to do it. Thanks Knauf.


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


On Wed, Feb 27, 2019 at 8:23 PM Konstantin Knauf 
wrote:

> Hi Taher ,
>
> a ProcessFunction is actually the way to do this. When chained to the
> previous operator the overhead of such a ProcessFunction in negligible.
>
> Any particular reason you don't want to go for a ProcessFunctio?
>
> Cheers,
>
> Konstantin
>
> On Wed, Feb 27, 2019 at 8:36 AM Taher Koitawala 
> wrote:
>
>> Hi All,
>>   We are currently working with Flink 1.7.2 version and we are
>> get the below given exception when doing a split on a split.
>>
>> SplitStreamsplitStream=stream1.split(new SomeSplitLogic());
>>
>> DataStream select1=splitStream.select("1");
>> DataStream select2=splitStream.select("2");
>>
>>
>> select2.split(new AnotherSplitLogic()).select("3");
>>
>>
>> Basically the exception is recommending to use SideOutput, however the
>> only way I see to get a side output is by using a process function. Can
>> someone suggest a better way of doing this?
>>
>> Exception :
>> Caused by: java.lang.IllegalStateException:  Consecutive multiple splits
>> are not supported. Splits are deprecated. Please use side-outputs
>>
>>
>> Regards,
>> Taher Koitawala
>> GS Lab Pune
>> +91 8407979163
>>
>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>


Re: flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-27 Thread Yaoting Gong
你好。
我不知道你的是什么项目的代码。我从flink 官方的样例代码 SocketWindowWordCount.scala找到。
从sum跟进去,最终能找到一个SumAggregator。

On Thu, Feb 28, 2019 at 2:34 PM  wrote:

>  Flink wordCount
> 本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-package
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
>
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.windowing.time.Time
>
> /**
>   * nc -lk 1234  输入数据
>   */
> object SocketWindowWordCountLocal {
>
>   def main(args: Array[String]): Unit = {
>
>
> val port = 1234
> // get the execution environment
>// val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
>
>
> val configuration : Configuration = new Configuration()
> val timeout = "10 s"
> val timeoutHeartbeatPause = "100 s"
> configuration.setString("akka.ask.timeout",timeout)
> configuration.setString("akka.lookup.timeout",timeout)
> configuration.setString("akka.tcp.timeout",timeout)
> configuration.setString("akka.transport.heartbeat.interval",timeout)
>
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
> configuration.setString("akka.watch.heartbeat.pause",timeout)
> configuration.setInteger("heartbeat.interval",1000)
> configuration.setInteger("heartbeat.timeout",5000)
> val env:StreamExecutionEnvironment =
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
>
>
>
>
>
> // get input data by connecting to the socket
> val dataStream = env.socketTextStream("localhost", port, '\n')
>
>
>
> import org.apache.flink.streaming.api.scala._
> val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w =>
> WordWithCount(w,1))
>   .keyBy("word")
>   /**
> * 每20秒刷新一次,相当于重新开始计数,
> * 好处,不需要一直拿所有的数据统计
> * 只需要在指定时间间隔内的增量数据,减少了数据规模
> */
>   .timeWindow(Time.seconds(20))
>   //.countWindow(3)
>   //.countWindow(3,1)
>   //.countWindowAll(3)
>
>
>   .sum("count" )
>
> textResult.print().setParallelism(1)
>
>
>
> if(args == null || args.size ==0){
>   env.execute("默认作业")
>
>   //执行计划
>   //println(env.getExecutionPlan)
>   //StreamGraph
>  //println(env.getStreamGraph.getStreamingPlanAsJSON)
>
>
>
>   //JsonPlanGenerator.generatePlan(jobGraph)
>
> }else{
>   env.execute(args(0))
> }
>
> println("结束")
>
>   }
>
>
>   // Data type for words with count
>   case class WordWithCount(word: String, count: Long)
>
> }
>
>
> On Thursday, 28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong <
> fall.for.you@gmail.com> wrote:
>
>  @Yuan Yifan
>
> *不能贴图的。*
>
> On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan  wrote:
>
> >
> > 你说的应该是这里的代码:
> >
> >
> >
> http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
> >
> > 其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。
> >
> > 本质上,sum是执行了一个Sum类型的Aggregate:
> > 其AggregateFunction是:
> >
> >
> org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
> > org.apache.flink.api.common.typeinfo.TypeInformation,
> > org.apache.flink.api.common.ExecutionConfig)
> >
> >
> > 其中实现了reduce方法:
> >
> > 所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。
> >
> >
> >
> > 在 2019-02-28 13:04:59," "  写道:
> > >请问:  flink wordcount中  sum是在什么时候,哪个地方调用的?
> >
> >
> >
> >
> >
>


Re:Re: flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-27 Thread Yuan Yifan
如果你只想知道在哪里做SUM的话,看这个类:
org.apache.flink.streaming.api.functions.aggregation.SumAggregator
的reduce函数就行了。


但是如果你要想知道啥时候把SUM的结果发射出来的话,这个就太复杂了,因为你后面有可能接个Trigger,在满足一定条件的时候也可能“手动”调用FireAndPurge,所以未必是在窗口时间结束的时候发射。


详细可以看下这里的代码:
org.apache.flink.streaming.api.datastream.WindowedStream#reduce(org.apache.flink.api.common.functions.ReduceFunction,
 org.apache.flink.streaming.api.functions.windowing.WindowFunction, 
org.apache.flink.api.common.typeinfo.TypeInformation)






在 2019-02-28 14:34:40," "  写道:
> Flink wordCount 
> 本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-package
>  com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
>
>import org.apache.flink.configuration.Configuration
>import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>import org.apache.flink.streaming.api.windowing.time.Time
>
>/**
>  * nc -lk 1234  输入数据
>  */
>object SocketWindowWordCountLocal {
>
>  def main(args: Array[String]): Unit = {
>
>
>val port = 1234
>// get the execution environment
>   // val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>
>
>val configuration : Configuration = new Configuration()
>val timeout = "10 s"
>val timeoutHeartbeatPause = "100 s"
>configuration.setString("akka.ask.timeout",timeout)
>configuration.setString("akka.lookup.timeout",timeout)
>configuration.setString("akka.tcp.timeout",timeout)
>configuration.setString("akka.transport.heartbeat.interval",timeout)
>
> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>configuration.setString("akka.watch.heartbeat.pause",timeout)
>configuration.setInteger("heartbeat.interval",1000)
>configuration.setInteger("heartbeat.timeout",5000)
>val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
>
>
>
>
>
>// get input data by connecting to the socket
>val dataStream = env.socketTextStream("localhost", port, '\n')
>
>
>
>import org.apache.flink.streaming.api.scala._
>val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
> WordWithCount(w,1))
>  .keyBy("word")
>  /**
>* 每20秒刷新一次,相当于重新开始计数,
>* 好处,不需要一直拿所有的数据统计
>* 只需要在指定时间间隔内的增量数据,减少了数据规模
>*/
>  .timeWindow(Time.seconds(20))
>  //.countWindow(3)
>  //.countWindow(3,1)
>  //.countWindowAll(3)
>
>
>  .sum("count" )
>
>textResult.print().setParallelism(1)
>
>
>
>if(args == null || args.size ==0){
>  env.execute("默认作业")
>
>  //执行计划
>  //println(env.getExecutionPlan)
>  //StreamGraph
> //println(env.getStreamGraph.getStreamingPlanAsJSON)
>
>
>
>  //JsonPlanGenerator.generatePlan(jobGraph)
>
>}else{
>  env.execute(args(0))
>}
>
>println("结束")
>
>  }
>
>
>  // Data type for words with count
>  case class WordWithCount(word: String, count: Long)
>
>}
>
>
>On Thursday, 28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong 
>  wrote:  
> 
> @Yuan Yifan
>
>*不能贴图的。*
>
>On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan  wrote:
>
>>
>> 你说的应该是这里的代码:
>>
>>
>> http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
>>
>> 其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。
>>
>> 本质上,sum是执行了一个Sum类型的Aggregate:
>> 其AggregateFunction是:
>>
>> org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
>> org.apache.flink.api.common.typeinfo.TypeInformation,
>> org.apache.flink.api.common.ExecutionConfig)
>>
>>
>> 其中实现了reduce方法:
>>
>> 所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。
>>
>>
>>
>> 在 2019-02-28 13:04:59," "  写道:
>> >请问:  flink wordcount中  sum是在什么时候,哪个地方调用的?
>>
>>
>>
>>
>>
>  


Re: flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-27 Thread  
 Flink wordCount 
本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-package
 com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocal {

  def main(args: Array[String]): Unit = {


val port = 1234
// get the execution environment
   // val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment


val configuration : Configuration = new Configuration()
val timeout = "10 s"
val timeoutHeartbeatPause = "100 s"
configuration.setString("akka.ask.timeout",timeout)
configuration.setString("akka.lookup.timeout",timeout)
configuration.setString("akka.tcp.timeout",timeout)
configuration.setString("akka.transport.heartbeat.interval",timeout)

configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
configuration.setString("akka.watch.heartbeat.pause",timeout)
configuration.setInteger("heartbeat.interval",1000)
configuration.setInteger("heartbeat.timeout",5000)
val env:StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)





// get input data by connecting to the socket
val dataStream = env.socketTextStream("localhost", port, '\n')



import org.apache.flink.streaming.api.scala._
val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
WordWithCount(w,1))
  .keyBy("word")
  /**
* 每20秒刷新一次,相当于重新开始计数,
* 好处,不需要一直拿所有的数据统计
* 只需要在指定时间间隔内的增量数据,减少了数据规模
*/
  .timeWindow(Time.seconds(20))
  //.countWindow(3)
  //.countWindow(3,1)
  //.countWindowAll(3)


  .sum("count" )

textResult.print().setParallelism(1)



if(args == null || args.size ==0){
  env.execute("默认作业")

  //执行计划
  //println(env.getExecutionPlan)
  //StreamGraph
 //println(env.getStreamGraph.getStreamingPlanAsJSON)



  //JsonPlanGenerator.generatePlan(jobGraph)

}else{
  env.execute(args(0))
}

println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long)

}


On Thursday, 28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong 
 wrote:  
 
 @Yuan Yifan

*不能贴图的。*

On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan  wrote:

>
> 你说的应该是这里的代码:
>
>
> http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
>
> 其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。
>
> 本质上,sum是执行了一个Sum类型的Aggregate:
> 其AggregateFunction是:
>
> org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
> org.apache.flink.api.common.typeinfo.TypeInformation,
> org.apache.flink.api.common.ExecutionConfig)
>
>
> 其中实现了reduce方法:
>
> 所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。
>
>
>
> 在 2019-02-28 13:04:59," "  写道:
> >请问:  flink wordcount中      sum是在什么时候,哪个地方调用的?
>
>
>
>
>
  

Re:flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-27 Thread Yuan Yifan


你说的应该是这里的代码:


http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code


其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。


本质上,sum是执行了一个Sum类型的Aggregate:
其AggregateFunction是:


org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
 org.apache.flink.api.common.typeinfo.TypeInformation, 
org.apache.flink.api.common.ExecutionConfig)




其中实现了reduce方法:


所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。





在 2019-02-28 13:04:59," "  写道:
>请问:   flink wordcount中  sum是在什么时候,哪个地方调用的?


??????flink wordcount?? sum??????????????????????????????

2019-02-27 Thread ????
??window??
??processfuntion


--  --
??: "??"<1543332...@qq.com>;
: 2019??2??28??(??) 1:56
??: "user-zh";

: ??flink wordcount??  sum??








--  --
??: " ";
: 2019??2??28??(??) 1:04
??: "user-zh@flink.apache.org";

: flink wordcount??  sum??



??   flink wordcount??  sum??

??????flink wordcount?? sum??????????????????????????????

2019-02-27 Thread ????





--  --
??: " ";
: 2019??2??28??(??) 1:04
??: "user-zh@flink.apache.org";

: flink wordcount??  sum??



??   flink wordcount??  sum??

flink wordcount中 sum是在什么时候,哪个地方调用的?

2019-02-27 Thread  
请问:   flink wordcount中      sum是在什么时候,哪个地方调用的?

?????? Blink????????????????????????

2019-02-27 Thread Dreamer






--  --
??: "Becket Qin";
: 2019??2??28??(??) 11:07
??: "user-zh";

: Re: Blink



Blink Blink??1.5.1Flink 
1.5.1??

On Thu, Feb 28, 2019 at 10:17 AM Dreamer <1762882...@qq.com> wrote:

> Blink??Flink?
>
> ??flink1.4.2??Blink??example??streamingIteation100??100.1500??7-8s??4??6000??Blink??7-8s??flink1.4.2??36-40min??100??flink1.4.2Blink??600010

Re: Blink对是否对迭代进行了优化?

2019-02-27 Thread Becket Qin
Blink 目前没有对迭代进行特别优化。不过Blink是基于社区1.5.1版本的,建议你用Flink 1.5.1版本测试一下看看是不是还是有区别。

On Thu, Feb 28, 2019 at 10:17 AM Dreamer <1762882...@qq.com> wrote:

> 各位,我想请教一下,Blink是否对Flink的迭代进行了优化?如果优化了的话是对批处理进行的优化还是流计算进行的优化?主要采用了哪些优化技术?
>
> 我前两天用flink1.4.2和Blink自带的example中streaming中的Iteation例子进行了实验对比,数据就是两个小于100的整数,看他们迭代几次相加能够超过100.我生成了1500条数据,两个集群时间差不多都是7-8s,但是我把相同的数据复制了4份,即6000条,用Blink处理依然是7-8s,但是flink1.4.2是36-40min。感觉很奇怪,我从100条依次往上翻倍赋值的时候两个引擎时间都差不多,但是超过一定数量后,flink1.4.2就突然变得特变慢。而Blink不变。我如果直接用6000条不重复的数据,两个引擎时间都要超过10多个小时(还没跑完)。是集群配置的原因还是其他的原因呢?


Re: How to add unit test for flink sql ?

2019-02-27 Thread Lifei Chen
Hi, Congxian:
 I found it finally .

it did not be included in master branch:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java

but still available in release-1.7
https://github.com/apache/flink/blob/release-1.7/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java

Best,
Lifei


Lifei Chen  于2019年2月28日周四 上午9:57写道:

> I am using flink v1.71 now,  and can not find  the library you suggested,
>  is it deprecated?
>
> Lifei Chen  于2019年2月28日周四 上午9:50写道:
>
>> Thanks, I will try it !
>>
>> Congxian Qiu  于2019年2月27日周三 下午9:17写道:
>>
>>> Hi, Lifei
>>>
>>> Maybe org.apache.flink.table.runtime.stream.sql.JavaSqlITCase can be
>>> helpful.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Lifei Chen  于2019年2月27日周三 下午4:20写道:
>>>
 Hi, all:

 I finished a flink streaming job with flink sql, which read data from
 kafka and write bach to elasticsearch.

 I have no idea how to add a unit test for testing sql I wrote,  any
 suggestions?

>>>


Re: How to add unit test for flink sql ?

2019-02-27 Thread Lifei Chen
I am using flink v1.71 now,  and can not find  the library you suggested,
 is it deprecated?

Lifei Chen  于2019年2月28日周四 上午9:50写道:

> Thanks, I will try it !
>
> Congxian Qiu  于2019年2月27日周三 下午9:17写道:
>
>> Hi, Lifei
>>
>> Maybe org.apache.flink.table.runtime.stream.sql.JavaSqlITCase can be
>> helpful.
>>
>> Best,
>> Congxian
>>
>>
>> Lifei Chen  于2019年2月27日周三 下午4:20写道:
>>
>>> Hi, all:
>>>
>>> I finished a flink streaming job with flink sql, which read data from
>>> kafka and write bach to elasticsearch.
>>>
>>> I have no idea how to add a unit test for testing sql I wrote,  any
>>> suggestions?
>>>
>>


Re: One source is much slower than the other side when join history data

2019-02-27 Thread liujiangang
Thank you very much.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to add unit test for flink sql ?

2019-02-27 Thread Lifei Chen
Thanks, I will try it !

Congxian Qiu  于2019年2月27日周三 下午9:17写道:

> Hi, Lifei
>
> Maybe org.apache.flink.table.runtime.stream.sql.JavaSqlITCase can be
> helpful.
>
> Best,
> Congxian
>
>
> Lifei Chen  于2019年2月27日周三 下午4:20写道:
>
>> Hi, all:
>>
>> I finished a flink streaming job with flink sql, which read data from
>> kafka and write bach to elasticsearch.
>>
>> I have no idea how to add a unit test for testing sql I wrote,  any
>> suggestions?
>>
>


Re: One source is much slower than the other side when join history data

2019-02-27 Thread Konstantin Knauf
Hi,

this topic has been discussed a lot recently in the community as "Event
Time Alignment/Synchronization" [1,2]. These discussion should provide a
starting point.

Cheers,

Konstantin

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html
[2] https://issues.apache.org/jira/browse/FLINK-10886



On Wed, Feb 27, 2019 at 3:03 AM 刘建刚  wrote:

>   When consuming history data in join operator with eventTime, reading
> data from one source is much slower than the other. As a result, the join
> operator will cache much data from the faster source in order to wait the
> slower source.
>   The question is that how can I make the difference of consumers'
> speed small?
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: event time & watermarks in connected streams with broadcast state

2019-02-27 Thread Konstantin Knauf
HI Rinat,

to my knowledge your workaround is fine & necessary. You can also emit a
Long.MAX_VALUE instead of the processing time to save some CPU cycles.

Cheers,

Konstantin



On Wed, Feb 27, 2019 at 9:32 PM Rinat  wrote:

> Hi mates, got some questions about using event time for the flink pipeline.
>
> My pipeline consists of two connected streams, one is a stream with
> business rules and another one is a stream with user events.
>
> I’ve broadcasted stream with business rules and connected it to the stream
> of events, thus I can apply all existing rules to each event.
> For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that
> accumulates broadcast state and applies rules from it to each event.
> In this function I would like to register event time timers.
>
> I’ve specified a AssignerWithPeriodicWatermarks for the stream of events,
> that extracts event timestamp and uses it as a timestamp and watermark, but
> sill got no success, because the broadcasted stream doesn’t have such
> assigner and always returns Long.MIN as a watermark value, so flink uses
> the smallest watermark, received from both streams, so event time doesn’t
> updated.
>
> How can I solve this problem and use timestamps from event stream as a
> pipeline event time ?
> Here is the configuration of my pipeline.
>
>
>
>
>
>
>
>
>
>
>
>
>
> *val bSegments = env  .addSource(rules)  
> .broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)val keyedEvents = 
> env  .addSource(events)  .assignTimestampsAndWatermarks(eventTimeAssigner)  
> .keyBy { event => event.getId.getGid }keyedEvents  .connect(bSegments)  
> .process(customerJourneyProcessor)  .addSink(sink)*
>
>
> I’ve found a workaround, that works for me, but I’m not sure, that it’s a
> proper decision.
>
> I can add a timestamp/ watermarks assigner to the stream of rules, that
> will always return *System.currentTime()*, thereby it always will be
> bigger than event timestamp, so, the KeyedBroadcastProcessFunction
> will use events stream timestamp as a watermark.
>
>
>
>
>
>
> *class RuleTimestampAssigner extends 
> AssignerWithPeriodicWatermarks[SegmentEvent] {  override def 
> getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis())  
> override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): 
> Long = rule.created}*
>
>
> But it looks like a hack and maybe someone can give an advice with the
> more convenient approach.
>
> Thx !
>
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>
> email: r.shari...@cleverdata.ru 
> mobile: +7 (925) 416-37-26
>
> CleverDATA
> make your data clever
>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


event time & watermarks in connected streams with broadcast state

2019-02-27 Thread Rinat
Hi mates, got some questions about using event time for the flink pipeline.

My pipeline consists of two connected streams, one is a stream with business 
rules and another one is a stream with user events. 

I’ve broadcasted stream with business rules and connected it to the stream of 
events, thus I can apply all existing rules to each event.
For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that 
accumulates broadcast state and applies rules from it to each event.
In this function I would like to register event time timers.

I’ve specified a AssignerWithPeriodicWatermarks for the stream of events, that 
extracts event timestamp and uses it as a timestamp and watermark, but sill got 
no success, because the broadcasted stream doesn’t have such assigner and 
always returns Long.MIN as a watermark value, so flink uses the smallest 
watermark, received from both streams, so event time doesn’t updated.

How can I solve this problem and use timestamps from event stream as a pipeline 
event time ?
Here is the configuration of my pipeline.

val bSegments = env
  .addSource(rules)
  .broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)

val keyedEvents = env
  .addSource(events)
  .assignTimestampsAndWatermarks(eventTimeAssigner)
  .keyBy { event => event.getId.getGid }

keyedEvents
  .connect(bSegments)
  .process(customerJourneyProcessor)
  .addSink(sink)

I’ve found a workaround, that works for me, but I’m not sure, that it’s a 
proper decision.

I can add a timestamp/ watermarks assigner to the stream of rules, that will 
always return System.currentTime(), thereby it always will be bigger than event 
timestamp, so, the KeyedBroadcastProcessFunction
will use events stream timestamp as a watermark.

class RuleTimestampAssigner extends 
AssignerWithPeriodicWatermarks[SegmentEvent] {

  override def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis())

  override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): 
Long = rule.created
}

But it looks like a hack and maybe someone can give an advice with the more 
convenient approach.

Thx !

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru 
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-27 Thread Austin Cawley-Edwards
Thanks Gary,

I will try to look into why the child-first strategy seems to have failed
for this dependency.

Best,
Austin

On Wed, Feb 27, 2019 at 12:25 PM Gary Yao  wrote:

> Hi,
>
> Actually Flink's inverted class loading feature was designed to mitigate
> problems with different versions of libraries that are not compatible with
> each other [1]. You may want to debug why it does not work for you.
>
> You can also try to use the Hadoop free Flink distribution, and export the
> HADOOP_CLASSPATH variable [2].
>
> Best,
> Gary
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/debugging_classloading.html#inverted-class-loading-and-classloader-resolution-order
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/hadoop.html#configuring-flink-with-hadoop-classpaths
>
> On Wed, Feb 27, 2019 at 5:23 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi,
>>
>> I recently experienced versioning clashes with the okio and okhttp when
>> trying to deploy a Flink 1.6.0 app to AWS EMR on Hadoop 2.8.4. After
>> investigating and talking to the okio team (see this issue)
>> , I found that both okio and
>> okhttp exist in the Flink uber jar with versions 1.4.0 and 2.4.0,
>> respectively, whereas I'm including versions 2.2.2 and 3.13.1 in my shaded
>> jar. The okio team suggested that Flink should shade the uber jar to fix
>> the issue, but I'm wondering if there is something I can do on my end to
>> have all versions exist simultaneously.
>>
>> From the issue, here are the okio contents of the uber jar:
>>
>> *jar -tf flink-shaded-hadoop2-uber-1.6.0.jar | grep okio*
>>
>> META-INF/maven/com.squareup.okio/
>> META-INF/maven/com.squareup.okio/okio/
>> META-INF/maven/com.squareup.okio/okio/pom.properties
>> META-INF/maven/com.squareup.okio/okio/pom.xml
>> okio/
>> okio/AsyncTimeout$1.class
>> okio/AsyncTimeout$2.class
>> okio/AsyncTimeout$Watchdog.class
>> okio/AsyncTimeout.class
>> okio/Base64.class
>> okio/Buffer$1.class
>> okio/Buffer$2.class
>> okio/Buffer.class
>> okio/BufferedSink.class
>> okio/BufferedSource.class
>> okio/ByteString.class
>> okio/DeflaterSink.class
>> okio/ForwardingSink.class
>> okio/ForwardingSource.class
>> okio/ForwardingTimeout.class
>> okio/GzipSink.class
>> okio/GzipSource.class
>> okio/InflaterSource.class
>> okio/Okio$1.class
>> okio/Okio$2.class
>> okio/Okio$3.class
>> okio/Okio.class
>> okio/RealBufferedSink$1.class
>> okio/RealBufferedSink.class
>> okio/RealBufferedSource$1.class
>> okio/RealBufferedSource.class
>> okio/Segment.class
>> okio/SegmentPool.class
>> okio/SegmentedByteString.class
>> okio/Sink.class
>> okio/Source.class
>> okio/Timeout$1.class
>> okio/Timeout.class
>> okio/Util.class
>>
>> Thank you,
>> Austin Cawley-Edwards
>>
>


Re: long lived standalone job session cluster in kubernetes

2019-02-27 Thread Heath Albritton
Great, my team is eager to get started.  I’m curious what progress had been 
made so far?

-H

> On Feb 26, 2019, at 14:43, Chunhui Shi  wrote:
> 
> Hi Heath and Till, thanks for offering help on reviewing this feature. I just 
> reassigned the JIRAs to myself after offline discussion with Jin. Let us work 
> together to get kubernetes integrated natively with flink. Thanks.
> 
>> On Fri, Feb 15, 2019 at 12:19 AM Till Rohrmann  wrote:
>> Alright, I'll get back to you once the PRs are open. Thanks a lot for your 
>> help :-)
>> 
>> Cheers,
>> Till
>> 
>>> On Thu, Feb 14, 2019 at 5:45 PM Heath Albritton  wrote:
>>> My team and I are keen to help out with testing and review as soon as there 
>>> is a pill request.
>>> 
>>> -H
>>> 
 On Feb 11, 2019, at 00:26, Till Rohrmann  wrote:
 
 Hi Heath,
 
 I just learned that people from Alibaba already made some good progress 
 with FLINK-9953. I'm currently talking to them in order to see how we can 
 merge this contribution into Flink as fast as possible. Since I'm quite 
 busy due to the upcoming release I hope that other community members will 
 help out with the reviewing once the PRs are opened.
 
 Cheers,
 Till
 
> On Fri, Feb 8, 2019 at 8:50 PM Heath Albritton  wrote:
> Has any progress been made on this?  There are a number of folks in
> the community looking to help out.
> 
> 
> -H
> 
> On Wed, Dec 5, 2018 at 10:00 AM Till Rohrmann  
> wrote:
> >
> > Hi Derek,
> >
> > there is this issue [1] which tracks the active Kubernetes integration. 
> > Jin Sun already started implementing some parts of it. There should 
> > also be some PRs open for it. Please check them out.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-9953
> >
> > Cheers,
> > Till
> >
> > On Wed, Dec 5, 2018 at 6:39 PM Derek VerLee  
> > wrote:
> >>
> >> Sounds good.
> >>
> >> Is someone working on this automation today?
> >>
> >> If not, although my time is tight, I may be able to work on a PR for 
> >> getting us started down the path Kubernetes native cluster mode.
> >>
> >>
> >> On 12/4/18 5:35 AM, Till Rohrmann wrote:
> >>
> >> Hi Derek,
> >>
> >> what I would recommend to use is to trigger the cancel with savepoint 
> >> command [1]. This will create a savepoint and terminate the job 
> >> execution. Next you simply need to respawn the job cluster which you 
> >> provide with the savepoint to resume from.
> >>
> >> [1] 
> >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin 
> >>  wrote:
> >>>
> >>> Hi Derek,
> >>>
> >>> I think your automation steps look good.
> >>> Recreating deployments should not take long
> >>> and as you mention, this way you can avoid unpredictable old/new 
> >>> version collisions.
> >>>
> >>> Best,
> >>> Andrey
> >>>
> >>> > On 4 Dec 2018, at 10:22, Dawid Wysakowicz  
> >>> > wrote:
> >>> >
> >>> > Hi Derek,
> >>> >
> >>> > I am not an expert in kubernetes, so I will cc Till, who should be 
> >>> > able
> >>> > to help you more.
> >>> >
> >>> > As for the automation for similar process I would recommend having a
> >>> > look at dA platform[1] which is built on top of kubernetes.
> >>> >
> >>> > Best,
> >>> >
> >>> > Dawid
> >>> >
> >>> > [1] https://data-artisans.com/platform-overview
> >>> >
> >>> > On 30/11/2018 02:10, Derek VerLee wrote:
> >>> >>
> >>> >> I'm looking at the job cluster mode, it looks great and I and
> >>> >> considering migrating our jobs off our "legacy" session cluster and
> >>> >> into Kubernetes.
> >>> >>
> >>> >> I do need to ask some questions because I haven't found a lot of
> >>> >> details in the documentation about how it works yet, and I gave up
> >>> >> following the the DI around in the code after a while.
> >>> >>
> >>> >> Let's say I have a deployment for the job "leader" in HA with ZK, 
> >>> >> and
> >>> >> another deployment for the taskmanagers.
> >>> >>
> >>> >> I want to upgrade the code or configuration and start from a
> >>> >> savepoint, in an automated way.
> >>> >>
> >>> >> Best I can figure, I can not just update the deployment resources 
> >>> >> in
> >>> >> kubernetes and allow the containers to restart in an arbitrary 
> >>> >> order.
> >>> >>
> >>> >> Instead, I expect sequencing is important, something along the 
> >>> >> lines
> >>> >> of this:
> >>> >>
> >>> >> 1. issue savepoint command on leader
> >>> >> 2. wait for savepoint
> >>> >> 3. destroy all leader and taskmanager containers
> 

Re: Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-27 Thread Kumar Bolar, Harshith
Thanks Till,

It appears to occur when a task manager crashes and restarts – A new blob-store 
directory gets created and the old one remains as is, and this piles up over 
time. Should these *old* blob-stores be manually cleared every time a task 
manager crashes and restarts?

Regards,
Harshith

From: Till Rohrmann 
Date: Tuesday, 26 February 2019 at 4:12 PM
To: Harshith Kumar Bolar 
Cc: user 
Subject: [External] Re: What are blobstore files and why do they keep filling 
up /tmp directory?

Hi Harshith,

the blob store files are necessary to distribute the Flink job in your cluster. 
After the job has been completed, they should be cleaned up. Only in the case 
of cluster crashes the clean up should not happen. Since Flink 1.4.2 is no 
longer actively supported, I would suggest to upgrade to the latest Flink 
version and to check whether the problem still occurs.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:48 AM Kumar Bolar, Harshith 
mailto:hk...@arity.com>> wrote:
Hi all,

We're running Flink on a standalone five node cluster. The /tmp/ directory 
keeps filling with directories starting with blobstore--*. These directories 
are very large (approx 1 GB) and fill up the space very quickly and the jobs 
fail with a No space left of device error. The files in these directories 
appear to be some form of binary representation of the jobs that are running on 
the cluster.
What are these files and how do I take care of cleaning them so they don't fill 
up /tmp/ causing jobs to fail?
Flink version: 1.4.2

Thanks,
Harshith


Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-27 Thread Gary Yao
Hi,

Actually Flink's inverted class loading feature was designed to mitigate
problems with different versions of libraries that are not compatible with
each other [1]. You may want to debug why it does not work for you.

You can also try to use the Hadoop free Flink distribution, and export the
HADOOP_CLASSPATH variable [2].

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/debugging_classloading.html#inverted-class-loading-and-classloader-resolution-order
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/hadoop.html#configuring-flink-with-hadoop-classpaths

On Wed, Feb 27, 2019 at 5:23 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi,
>
> I recently experienced versioning clashes with the okio and okhttp when
> trying to deploy a Flink 1.6.0 app to AWS EMR on Hadoop 2.8.4. After
> investigating and talking to the okio team (see this issue)
> , I found that both okio and
> okhttp exist in the Flink uber jar with versions 1.4.0 and 2.4.0,
> respectively, whereas I'm including versions 2.2.2 and 3.13.1 in my shaded
> jar. The okio team suggested that Flink should shade the uber jar to fix
> the issue, but I'm wondering if there is something I can do on my end to
> have all versions exist simultaneously.
>
> From the issue, here are the okio contents of the uber jar:
>
> *jar -tf flink-shaded-hadoop2-uber-1.6.0.jar | grep okio*
>
> META-INF/maven/com.squareup.okio/
> META-INF/maven/com.squareup.okio/okio/
> META-INF/maven/com.squareup.okio/okio/pom.properties
> META-INF/maven/com.squareup.okio/okio/pom.xml
> okio/
> okio/AsyncTimeout$1.class
> okio/AsyncTimeout$2.class
> okio/AsyncTimeout$Watchdog.class
> okio/AsyncTimeout.class
> okio/Base64.class
> okio/Buffer$1.class
> okio/Buffer$2.class
> okio/Buffer.class
> okio/BufferedSink.class
> okio/BufferedSource.class
> okio/ByteString.class
> okio/DeflaterSink.class
> okio/ForwardingSink.class
> okio/ForwardingSource.class
> okio/ForwardingTimeout.class
> okio/GzipSink.class
> okio/GzipSource.class
> okio/InflaterSource.class
> okio/Okio$1.class
> okio/Okio$2.class
> okio/Okio$3.class
> okio/Okio.class
> okio/RealBufferedSink$1.class
> okio/RealBufferedSink.class
> okio/RealBufferedSource$1.class
> okio/RealBufferedSource.class
> okio/Segment.class
> okio/SegmentPool.class
> okio/SegmentedByteString.class
> okio/Sink.class
> okio/Source.class
> okio/Timeout$1.class
> okio/Timeout.class
> okio/Util.class
>
> Thank you,
> Austin Cawley-Edwards
>


KeyBy distribution across taskslots

2019-02-27 Thread Aggarwal, Ajay
I couldn’t find reference to it anywhere in the docs, so I thought I will ask 
here.



When I use KeyBy operator, say KeyBy (“customerId”) and some keys (i.e. 
customers) are way too noisy than others, is there a way to ensure that too 
many noisy customers do not land on the same taskslot? In general does flink 
attempts to keep the load balanced across different taskslots assigned to a 
KeyBy operator ?



I wouldn’t be surprised if the answer is “currently no”. Would like to know if 
something related is planned for future. Also would love to hear from others 
who ran into similar situation and how they addressed it.



Thanks.



Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-27 Thread Chesnay Schepler

Short-term I'd try relocating the okio/okhttp dependencies in your jar.

I'm not too keen on adding more relocations to the hadoop jar; I can't 
gauge the possible side-effects.


On 27.02.2019 14:54, Austin Cawley-Edwards wrote:
Following up to add more info, I am building my app with maven based 
on the sample Flink pom.xml


My shade plugin config is:
org.apache.maven.plugins maven-shade-plugin 3.0.0   
 package  shade org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* log4j:*  *:*  META-INF/*.SF META-INF/*.DSA META-INF/*.RSA 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 
${main.class}
 


Thanks again and let me know if there is more information I can provide.

Austin

On Tue, Feb 26, 2019 at 10:59 PM Austin Cawley-Edwards 
mailto:austin.caw...@gmail.com>> wrote:


Hi,

I recently experienced versioning clashes with the okio and
okhttp when trying to deploy a Flink 1.6.0 app to AWS EMR on
Hadoop 2.8.4. After investigating and talking to the okio team
(see this issue) , I
found that both okio and okhttp exist in the Flink uber jar with
versions 1.4.0 and 2.4.0, respectively, whereas I'm including
versions 2.2.2 and 3.13.1 in my shaded jar. The okio team
suggested that Flink should shade the uber jar to fix the issue,
but I'm wondering if there is something I can do on my end to have
all versions exist simultaneously.

From the issue, here are the okio contents of the uber jar:

*jar -tf flink-shaded-hadoop2-uber-1.6.0.jar | grep okio*

META-INF/maven/com.squareup.okio/
META-INF/maven/com.squareup.okio/okio/
META-INF/maven/com.squareup.okio/okio/pom.properties
META-INF/maven/com.squareup.okio/okio/pom.xml
okio/
okio/AsyncTimeout$1.class
okio/AsyncTimeout$2.class
okio/AsyncTimeout$Watchdog.class
okio/AsyncTimeout.class
okio/Base64.class
okio/Buffer$1.class
okio/Buffer$2.class
okio/Buffer.class
okio/BufferedSink.class
okio/BufferedSource.class
okio/ByteString.class
okio/DeflaterSink.class
okio/ForwardingSink.class
okio/ForwardingSource.class
okio/ForwardingTimeout.class
okio/GzipSink.class
okio/GzipSource.class
okio/InflaterSource.class
okio/Okio$1.class
okio/Okio$2.class
okio/Okio$3.class
okio/Okio.class
okio/RealBufferedSink$1.class
okio/RealBufferedSink.class
okio/RealBufferedSource$1.class
okio/RealBufferedSource.class
okio/Segment.class
okio/SegmentPool.class
okio/SegmentedByteString.class
okio/Sink.class
okio/Source.class
okio/Timeout$1.class
okio/Timeout.class
okio/Util.class

Thank you,
Austin Cawley-Edwards





Re: Split Stream on a Split Stream

2019-02-27 Thread Konstantin Knauf
Hi Taher ,

a ProcessFunction is actually the way to do this. When chained to the
previous operator the overhead of such a ProcessFunction in negligible.

Any particular reason you don't want to go for a ProcessFunctio?

Cheers,

Konstantin

On Wed, Feb 27, 2019 at 8:36 AM Taher Koitawala 
wrote:

> Hi All,
>   We are currently working with Flink 1.7.2 version and we are get
> the below given exception when doing a split on a split.
>
> SplitStreamsplitStream=stream1.split(new SomeSplitLogic());
>
> DataStream select1=splitStream.select("1");
> DataStream select2=splitStream.select("2");
>
>
> select2.split(new AnotherSplitLogic()).select("3");
>
>
> Basically the exception is recommending to use SideOutput, however the
> only way I see to get a side output is by using a process function. Can
> someone suggest a better way of doing this?
>
> Exception :
> Caused by: java.lang.IllegalStateException:  Consecutive multiple splits
> are not supported. Splits are deprecated. Please use side-outputs
>
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-27 Thread Austin Cawley-Edwards
Following up to add more info, I am building my app with maven based on the
sample Flink pom.xml

My shade plugin config is:

org.apache.maven.plugins
maven-shade-plugin
3.0.0



package

shade




org.apache.flink:force-shading
com.google.code.findbugs:jsr305
org.slf4j:*
log4j:*





*:*

META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA





${main.class}







Thanks again and let me know if there is more information I can provide.

Austin

On Tue, Feb 26, 2019 at 10:59 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi,
>
> I recently experienced versioning clashes with the okio and okhttp when
> trying to deploy a Flink 1.6.0 app to AWS EMR on Hadoop 2.8.4. After
> investigating and talking to the okio team (see this issue)
> , I found that both okio and
> okhttp exist in the Flink uber jar with versions 1.4.0 and 2.4.0,
> respectively, whereas I'm including versions 2.2.2 and 3.13.1 in my shaded
> jar. The okio team suggested that Flink should shade the uber jar to fix
> the issue, but I'm wondering if there is something I can do on my end to
> have all versions exist simultaneously.
>
> From the issue, here are the okio contents of the uber jar:
>
> *jar -tf flink-shaded-hadoop2-uber-1.6.0.jar | grep okio*
>
> META-INF/maven/com.squareup.okio/
> META-INF/maven/com.squareup.okio/okio/
> META-INF/maven/com.squareup.okio/okio/pom.properties
> META-INF/maven/com.squareup.okio/okio/pom.xml
> okio/
> okio/AsyncTimeout$1.class
> okio/AsyncTimeout$2.class
> okio/AsyncTimeout$Watchdog.class
> okio/AsyncTimeout.class
> okio/Base64.class
> okio/Buffer$1.class
> okio/Buffer$2.class
> okio/Buffer.class
> okio/BufferedSink.class
> okio/BufferedSource.class
> okio/ByteString.class
> okio/DeflaterSink.class
> okio/ForwardingSink.class
> okio/ForwardingSource.class
> okio/ForwardingTimeout.class
> okio/GzipSink.class
> okio/GzipSource.class
> okio/InflaterSource.class
> okio/Okio$1.class
> okio/Okio$2.class
> okio/Okio$3.class
> okio/Okio.class
> okio/RealBufferedSink$1.class
> okio/RealBufferedSink.class
> okio/RealBufferedSource$1.class
> okio/RealBufferedSource.class
> okio/Segment.class
> okio/SegmentPool.class
> okio/SegmentedByteString.class
> okio/Sink.class
> okio/Source.class
> okio/Timeout$1.class
> okio/Timeout.class
> okio/Util.class
>
> Thank you,
> Austin Cawley-Edwards
>


Re: flink list and flink run commands timeout

2019-02-27 Thread Gary Yao
Hi Sen Sun,

The question is already resolved. You can find the entire email thread here:


http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/flink-list-and-flink-run-commands-timeout-td22826.html

Best,
Gary

On Wed, Feb 27, 2019 at 7:55 AM sen  wrote:

> Hi Aneesha:
>
>   I am also facing the same problem.When I turn on the HA on yarn ,it
> will get the same exception. While I turn off the Ha configuration ,it
> works
> fine.
>   I want to know that what  did  you do to deal with the problem?
>
> Thanks!
> Sen Sun
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: submit job failed on Yarn HA

2019-02-27 Thread Gary Yao
Hi,

How did you determine "jmhost" and "port"? Actually you do not need to
specify
these manually. If the client is using the same configuration as your
cluster,
the client will look up the leading JM from ZooKeeper.

If you have already tried omitting the "-m" parameter, you can check in the
client logs which host is used for the job submission [1]. Note that you
need
to enable logging on DEBUG level.

The root cause in your stacktrace is a TimeoutException. I would debug this
by
checking if you can establish a TCP connection – from the machine you are
submitting the job from, to the target host/port [2].

Moreover, you are using a quite dated Flink version. The newest version in
the
1.5 major release is 1.5.6 – so consider upgrading to that or even to 1.7.

Best,
Gary

[1]
https://github.com/apache/flink/blob/3488f8b144a2127497c39b8ed5a48a65b551c57d/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java#L185
[2]
https://stackoverflow.com/questions/4922943/test-from-shell-script-if-remote-tcp-port-is-open

On Wed, Feb 27, 2019 at 8:09 AM 孙森  wrote:

> Hi all:
>
> I run flink (1.5.1 with hadoop 2.7) on yarn ,and submit job by
> “/usr/local/flink/bin/flink run -m jmhost:port my.jar”, but the submission
> is failed.
> The HA configuration is :
>
>-  high-availability: zookeeper
>-  high-availability.storageDir: hdfs:///flink/ha/
>-  high-availability.zookeeper.quorum:  hdp1:2181,hdp2:2181,hdp3:2181
>-  yarn.application-attempts: 2
>
> The info showed int the client log:
>
>
> 2019-02-27 11:48:38,651 INFO  org.apache.flink.runtime.rest.RestClient
>   - Shutting down rest endpoint.
> 2019-02-27 11:48:38,659 INFO  org.apache.flink.runtime.rest.RestClient
>   - Rest endpoint shutdown complete.
> 2019-02-27 11:48:38,662 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
> 2019-02-27 11:48:38,665 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
> 2019-02-27 11:48:38,670 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>   - backgroundOperationsLoop exiting
> 2019-02-27 11:48:38,689 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
> 0x2679c52880c00ee closed
> 2019-02-27 11:48:38,689 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
> EventThread shut down for session: 0x2679c52880c00ee
> 2019-02-27 11:48:38,690 ERROR org.apache.flink.client.cli.CliFrontend 
>   - Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result.
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>  

Re: How to add unit test for flink sql ?

2019-02-27 Thread Congxian Qiu
Hi, Lifei

Maybe org.apache.flink.table.runtime.stream.sql.JavaSqlITCase can be
helpful.

Best,
Congxian


Lifei Chen  于2019年2月27日周三 下午4:20写道:

> Hi, all:
>
> I finished a flink streaming job with flink sql, which read data from
> kafka and write bach to elasticsearch.
>
> I have no idea how to add a unit test for testing sql I wrote,  any
> suggestions?
>


Re: Test FileUtilsTest.testDeleteDirectory failed when building Flink

2019-02-27 Thread Paul Lam
Hi Chesnay & Fabian,

Thanks for your replies.

I found it should be related to the CI runner. I moved to gitlab CI which runs 
the script as root user by default, so it is always able to remove a write 
protected file.

Best,
Paul Lam

> 在 2019年2月20日,17:08,Chesnay Schepler  写道:
> 
> I ran into a similar issue when i looked at other CI solutions; imo we 
> shouldn't rely on the result of setWritable but instead actually verify 
> whether the forbidden operation (i.e. creating/writing files) throws an error.
> 
> Back then also created a JIRA: 
> https://issues.apache.org/jira/browse/FLINK-5970 
> 
> 
> On 18.02.2019 14:10, Fabian Hueske wrote:
>> Hi Paul,
>> 
>> Which components (Flink, JDK, Docker base image, ...) are you upgrading and 
>> which versions do you come from?
>> I think it would be good to check how (and with which options) the JVM in 
>> the container is started.
>> 
>> Best, Fabian
>> 
>> 
>> Am Fr., 15. Feb. 2019 um 09:50 Uhr schrieb Paul Lam > >:
>> Hi all,
>> 
>> Recently we migrate Flink build to a new docker image, after which the build 
>> job always fails with test errors on local file system permissions. 
>> 
>> For example: FileUtilsTest.testDeleteDirectory:129 this should fail with an 
>> exception.
>> 
>> I notice the following statements in the javadoc of 
>> `java.io.File.setWritable`:
>> 
>> > On some platforms it may be possible to start the Java virtual machine 
>> > with special privileges that allow it to modify files that disallow write 
>> > operations.
>> 
>> I think it’s what the test is designed for and where the problem lies. 
>> 
>> Could anyone help me with this? Thanks a lot!
>> 
>> WRT the environment:
>> 
>> - Flink version: 1.7.1
>> - JDK: open jdk 1.8.0_111
>> - OS version: debian 8
>> 
>> Best,
>> Paul Lam
>> 
> 



Re: Re:submit job failed on Yarn HA

2019-02-27 Thread sen
I setup a yarn cluster use the :
./bin/yarn-session.sh -n 10 -tm 8192 -s 32

Then I submit a job to this cluster.It's OK,I've used for a long time。
In this way ,you can submit multi jobs in one cluster.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to add unit test for flink sql ?

2019-02-27 Thread Lifei Chen
Hi, all:

I finished a flink streaming job with flink sql, which read data from kafka
and write bach to elasticsearch.

I have no idea how to add a unit test for testing sql I wrote,  any
suggestions?


Re:submit job failed on Yarn HA

2019-02-27 Thread shengjk1
Hi
 @孙森  

“/usr/local/flink/bin/flink run -m jmhost:port my.jar”  is not submit on yarn .
If you want sumit job on yarn ,you should   "/usr/local/flink/bin/flink run -m 
yarn-cluster  my.jar"


Please refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/cli.html


Best,
Shengjk1




On 02/27/2019 15:09,孙森 wrote:
Hi all:

I run flink (1.5.1 with hadoop 2.7) on yarn ,and submit job by 
“/usr/local/flink/bin/flink run -m jmhost:port my.jar”, but the submission is 
failed.
The HA configuration is :
 high-availability: zookeeper
 high-availability.storageDir: hdfs:///flink/ha/
 high-availability.zookeeper.quorum:  hdp1:2181,hdp2:2181,hdp3:2181
 yarn.application-attempts: 2
The info showed int the client log:


2019-02-27 11:48:38,651 INFO  org.apache.flink.runtime.rest.RestClient  
- Shutting down rest endpoint.
2019-02-27 11:48:38,659 INFO  org.apache.flink.runtime.rest.RestClient  
- Rest endpoint shutdown complete.
2019-02-27 11:48:38,662 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-02-27 11:48:38,665 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-02-27 11:48:38,670 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
  - backgroundOperationsLoop exiting
2019-02-27 11:48:38,689 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
0x2679c52880c00ee closed
2019-02-27 11:48:38,689 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
EventThread shut down for session: 0x2679c52880c00ee
2019-02-27 11:48:38,690 ERROR org.apache.flink.client.cli.CliFrontend   
- Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result.
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at