How should I choose the deployment model in product environment

2019-03-06 Thread 126

Hi all:
 I want to deploy my flink cluster in product environment.  How should I 
choose the deployment model?
 Standalone Cluster?
 yarn session?(Start a long-running Flink cluster on YARN)
 yarn-cluster ?(Run a single Flink job on YARN)

 Thanks

Re: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

2019-03-06 Thread Yun Tang
Hi Jack

How about extracting flink-metrics-prometheus-1.6.1.jar from downloaded 
distribution tar https://archive.apache.org/dist/flink/flink-1.6.1/ and upload 
it to `/usr/lib/flink/lib` on EMR?

Otherwise, I believe setup a customized Flink cluster on EMR [1] should work if 
no other convenient solutions.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#custom-emr-installation


Best
Yun Tang

From: Jack Tuck 
Sent: Thursday, March 7, 2019 3:39
To: user@flink.apache.org
Subject: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?


I currently have Flink setup and have a Job running on EMR and I'm now trying 
to add monitoring by sending metrics off to prometheus.



I have come across an issue with running Flink on EMR. I'm using Terraform to 
provision EMR (I run ansible after to download and run a job).  Out the box, it 
does not look like EMR's Flink distribution includes the optional jars 
(flink-metrics-prometheus, flink-cep, etc).



Looking at Flink's documentation, it says

> "In order to use this reporter you must copy 
> `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your 
> Flink distribution"

https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter



But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink 
has a directory called `opts` and i can not see 
`flink-metrics-prometheus-1.6.1.jar` anywhere.



I know Flink has other optional libs you'd usually have to copy if you want to 
use them such as flink-cep, but I'm not sure how to do this when using EMR.



This is the exception i get, which I beleive is because it can not find the 
metrics jar in its classpath.

```

java.lang.ClassNotFoundException: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:264)

at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:144)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)

at 
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)

```



EMR resource in terraform

```resource "aws_emr_cluster" "emr_flink" {

  name  = "ce-emr-flink-arn"

  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing

  applications  = ["Flink"]



  ec2_attributes {

key_name  = "ce_test"

subnet_id = "${aws_subnet.ce_test_subnet_public.id}"

instance_profile  = 
"${aws_iam_instance_profile.emr_profile.arn}"

emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"

emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"

additional_master_security_groups  = 
"${aws_security_group.external_connectivity.id}"

additional_slave_security_groups  = 
"${aws_security_group.external_connectivity.id}"

  }



  ebs_root_volume_size = 100

  master_instance_type = "m4.xlarge"

  core_instance_type   = "m4.xlarge"

  core_instance_count  = 2



  service_role = "${aws_iam_role.iam_emr_service_role.arn}"



  configurations_json = <

答复: Flink 在什么情况下产生乱序问题?

2019-03-06 Thread 戴嘉诚
你可以了解下触发器,默认的触发器是按照你发现的做,如果你要实时输出,可以吧触发器更改为ContinuonsEventTimeTrigger 
,然后设置你的时间间隔。

发件人: 刘 文
发送时间: 2019年3月6日 22:55
收件人: user-zh@flink.apache.org
抄送: qcx978132...@gmail.com
主题: Re: Flink 在什么情况下产生乱序问题?

).在验证EventTime 加watermark 处理中,我发现往socket发送的数据,不能及时输出或没有输出
).验证发现,只有当前发送的数据的 getCurrentWatermark()的时间戳 > TimeWindow + maxOutOfOrderness 
时,才会触发结束上一次window
).可是最新的记录是不能及时被处理,或者是不能被处理
).请问这个问题怎么处理?









---

> 在 2019年3月6日,下午10:29,刘 文  写道:
> 
> 该问题,明白一点了,整理成文档供大家参考
> ———
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析: 
> https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md
>  
> 
> 
> 
> ———
> 
> 
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析
> 
>  
> 源码
> 
> https://github.com/opensourceteams/flink-maven-scala 
> 
>  
> 概述
> 
> 由于Flink默认的ProcessTime是按Window收到Source发射过来的数据的时间,来算了,也就是按Flink程序接收的时间来进行计算,但实际业务,处理周期性的数据时,每5分钟内的数据,每1个小时内的数据进行分析,实际是业务源发生的时间来做为实际时间,所以用Flink的EventTime和Watermark来处理这个问题
> 指定Env为EventTime
> 调置数据流assignTimestampsAndWatermarks函数,由AssignerWithPeriodicWatermarks中的extractTimestamp()函数提取实际业务时间,getCurrentWatermark得到最新的时间,这个会对每个元素算一次,拿最大的当做计算时间,如果当前时间,大于上一次的时间间隔
>  + 这里设置的延时时间,就会结束上一个Window,也就是对这一段时间的Window进行操作
> 本程序以指定业务时间,来做为统计时间
>  
> 程序
> 
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
> 
> import java.util.{Date, Properties}
> 
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
> 
> 
> object SockWordCountRun {
> 
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> 
> // get the execution environment
>// val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
> val configuration : Configuration = 
> ConfigurationUtil.getConfiguration(true)
> 
> val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
> 
> import org.apache.flink.streaming.api.scala._
> val dataStream = env.socketTextStream("localhost", 1234, '\n')
> 
>  // .setParallelism(3)
> 
> 
> dataStream.assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[String] {
> 
> val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
> var currentMaxTimestamp: Long = _
> var currentTimestamp: Long = _
> 
> override def getCurrentWatermark: Watermark =  new 
> Watermark(currentMaxTimestamp - maxOutOfOrderness)
> 
> override def extractTimestamp(element: String, 
> previousElementTimestamp: Long): Long = {
>   val jsonObject = JSON.parseObject(element)
> 
>   val timestamp = jsonObject.getLongValue("extract_data_time")
>   currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>   currentTimestamp = timestamp
> 
> /*  println("===watermark begin===")
>   println()
>   println(new Date(currentMaxTimestamp - 20 * 1000))
>   println(jsonObject)
>   println("===watermark end===")
>   println()*/
>   timestamp
> }
> 
>   })
>   .timeWindowAll(Time.seconds(3))
> 
>   .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>   override def process(context: Context, elements: Iterable[String], out: 
> Collector[String]): Unit = {
> 
> 
> println()
> println("开始提交window")
> println(new Date())
> for(e <- elements) out.collect(e)
> 

sql-client batch 模式执行报错

2019-03-06 Thread yuess_coder
我在sql-client提交任务:


create table csv_source1(
id varchar,
name varchar
) with (
type ='csv',
path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
);




create table csv_sink(
id varchar,
name varchar
) with (
type ='csv',
path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
);


insert into csv_sink  select t1.name,t1.id from csv_source1 t1




错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution 
batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?

Re: [DISCUSS] Create a Flink ecosystem website

2019-03-06 Thread Ufuk Celebi
I like Shaoxuan's idea to keep this a static site first. We could then
iterate on this and make it a dynamic thing. Of course, if we have the
resources in the community to quickly start with a dynamic site, I'm
not apposed.

– Ufuk

On Wed, Mar 6, 2019 at 2:31 PM Robert Metzger  wrote:
>
> Awesome! Thanks a lot for looking into this Becket! The VMs hosted by Infra
> look suitable.
>
> @Shaoxuan: There is actually already a static page. It used to be linked,
> but has been removed from the navigation bar for some reason. This is the
> page: https://flink.apache.org/ecosystem.html
> We could update the page and add it back to the navigation bar for the
> coming weeks. What do you think?
>
> I would actually like to push for a dynamic page right away.
>
> I know it's kind of a bold move, but how do you feel about sending the
> owners of spark-packages.org a short note, if they are interested in
> sharing the source? We could maintain the code together in a public repo.
> If they are not interested in sharing, or we decide not to ask in the first
> place, I'm happy to write down a short description of the requirements,
> maybe some mockups. We could then see if we find somebody here in the
> community who's willing to implement it.
> Given the number of people who are eager to contribute, I believe we will
> be able to find somebody pretty soon.
>
>
> On Wed, Mar 6, 2019 at 3:49 AM Becket Qin  wrote:
>
> > Forgot to provide the link...
> >
> > [1] https://www.apache.org/dev/services.html#blogs (Apache infra services)
> > [2] https://www.apache.org/dev/freebsd-jails (FreeBSD Jail provided by
> > Apache Infra)
> >
> > On Wed, Mar 6, 2019 at 10:46 AM Becket Qin  wrote:
> >
> >> Hi Robert,
> >>
> >> Thanks for the feedback. These are good points. We should absolutely
> >> shoot for a dynamic website to support more interactions in the community.
> >> There might be a few things to solve:
> >> 1. The website code itself. An open source solution would be great. TBH,
> >> I do not have much experience on building a website. It'll be great if
> >> someone could help comment on the solution here.
> >> 2. The hardware to host the website. Apache Infra provides a few
> >> services[1] that Apache projects can leverage. I did not see database
> >> service, but maybe we can run a simple MySQL db in FreeBSD jail[2].
> >>
> >> @Bowen & vino, thanks for the positive feedback!
> >>
> >> @Shaoxuan Wang 
> >> Thanks for the suggestion. That sounds reasonable to me. We probably need
> >> a page in the Flink official site anyways, even just provide links it to
> >> the ecosystem website. So listing the connectors in that static page seems
> >> something we could start with while we are working on the dynamic pages.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Wed, Mar 6, 2019 at 10:40 AM Shaoxuan Wang 
> >> wrote:
> >>
> >>> Hi Becket and Robert,
> >>>
> >>> I like this idea!  Let us roll this out with Flink connectors at the
> >>> first beginning. We can start with a static page, and upgrade it when we
> >>> find a better solution for dynamic one with rich functions.
> >>>
> >>> Regards,
> >>> Shaoxuan
> >>>
> >>>
> >>> On Wed, Mar 6, 2019 at 1:36 AM Robert Metzger 
> >>> wrote:
> >>>
>  Hey Becket,
> 
>  This is a great idea!
>  For this to be successful, we need to make sure the page is placed
>  prominently so that the people submitting something will get attention 
>  for
>  their contributions.
>  I think a dynamic site would probably be better, if we want features
>  such as up and downvoting or comments.
>  I would also like this to be hosted on Apache infra, and endorsed by
>  the community.
> 
>  Does anybody here know any existing software that we could use?
>  The only think I was able to find is AUR: https://aur.archlinux.org/
>  (which is a community packages site for Arch Linux. The source code of 
>  this
>  portal is open source, but the layout and structure is not an ideal fit 
>  for
>  our requirements)
> 
>  Best,
>  Robert
> 
> 
> 
>  On Tue, Mar 5, 2019 at 12:03 PM Becket Qin 
>  wrote:
> 
> > Hi folks,
> >
> > I would like to start a discussion thread about creating a Flink
> > ecosystem website. The website aims to help contributors who have 
> > developed
> > projects around Flink share their work with the community.
> >
> > Please see the following doc for more details.
> >
> > https://docs.google.com/document/d/12oCItoLbKrLGuwEUFcCfigezIR2hW3925j1hh3kGp4A/edit#
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> 


Schema Evolution on Dynamic Schema

2019-03-06 Thread shkob1
Hey,

My job is built on SQL that is injected as an input to the job. so lets take
an example of 

Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a

(side note: in order for the state not to grow indefinitely i'm transforming
to a retracted stream and filtering based on a custom trigger)

In order to get the output as a Json format i basically created a way to
dynamically generate a class and registering it to the class loader, so when
transforming to the retracted stream im doing something like:

Table result = tableEnv.sqlQuery(sqlExpression);
tableEnv.toRetractStream(result, Row.class, config)
.filter(tuple -> tuple.f0)
.map(new RowToDynamicClassMapper(sqlSelectFields))
.addSink(..)

This actually works pretty good (though i do need to make sure to register
the dynamic class to the class loader whenever the state is loaded)

Im now looking into "schema evolution" - which basically means what happens
when the query is changed (say max(c) is removed, and maybe max(d) is
added). I dont know if that fits the classic "schema evolution" feature or
should that be thought about differently. Would be happy to get some
thoughts.

Thanks!







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

2019-03-06 Thread Jack Tuck
I currently have Flink setup and have a Job running on EMR and I'm now trying 
to add monitoring by sending metrics off to prometheus.

I have come across an issue with running Flink on EMR. I'm using Terraform to 
provision EMR (I run ansible after to download and run a job).  Out the box, it 
does not look like EMR's Flink distribution includes the optional jars 
(flink-metrics-prometheus, flink-cep, etc).

Looking at Flink's documentation, it says
> "In order to use this reporter you must copy 
> `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your 
> Flink distribution"
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink 
has a directory called `opts` and i can not see 
`flink-metrics-prometheus-1.6.1.jar` anywhere.

I know Flink has other optional libs you'd usually have to copy if you want to 
use them such as flink-cep, but I'm not sure how to do this when using EMR.

This is the exception i get, which I beleive is because it can not find the 
metrics jar in its classpath.
```
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:144)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
at 
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)
```

EMR resource in terraform
```resource "aws_emr_cluster" "emr_flink" {
  name  = "ce-emr-flink-arn"
  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing
  applications  = ["Flink"]

  ec2_attributes {
key_name  = "ce_test"
subnet_id = "${aws_subnet.ce_test_subnet_public.id}"
instance_profile  = 
"${aws_iam_instance_profile.emr_profile.arn}"
emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"
emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"
additional_master_security_groups  = 
"${aws_security_group.external_connectivity.id}"
additional_slave_security_groups  = 
"${aws_security_group.external_connectivity.id}"
  }

  ebs_root_volume_size = 100
  master_instance_type = "m4.xlarge"
  core_instance_type   = "m4.xlarge"
  core_instance_count  = 2

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

  configurations_json = <

How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

2019-03-06 Thread Jack Tuck
I currently have Flink setup and have a Job running on EMR and I'm now trying 
to add monitoring by sending metrics off to prometheus.

I have come across an issue with running Flink on EMR. I'm using Terraform to 
provision EMR (I run ansible after to download and run a job).  Out the box, it 
does not look like EMR's Flink distribution includes the optional jars 
(flink-metrics-prometheus, flink-cep, etc).

Looking at Flink's documentation, it says
> "In order to use this reporter you must copy 
> `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your 
> Flink distribution"
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink 
has a directory called `opts` and i can not see 
`flink-metrics-prometheus-1.6.1.jar` anywhere.

I know Flink has other optional libs you'd usually have to copy if you want to 
use them such as flink-cep, but I'm not sure how to do this when using EMR.

This is the exception i get, which I beleive is because it can not find the 
metrics jar in its classpath.
```
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:144)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
at 
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)
```

EMR resource in terraform
```resource "aws_emr_cluster" "emr_flink" {
  name  = "ce-emr-flink-arn"
  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing
  applications  = ["Flink"]

  ec2_attributes {
key_name  = "ce_test"
subnet_id = "${aws_subnet.ce_test_subnet_public.id}"
instance_profile  = 
"${aws_iam_instance_profile.emr_profile.arn}"
emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"
emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"
additional_master_security_groups  = 
"${aws_security_group.external_connectivity.id}"
additional_slave_security_groups  = 
"${aws_security_group.external_connectivity.id}"
  }

  ebs_root_volume_size = 100
  master_instance_type = "m4.xlarge"
  core_instance_type   = "m4.xlarge"
  core_instance_count  = 2

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

  configurations_json = <

Re: Task slot sharing: force reallocation

2019-03-06 Thread Le Xu
1.3.2 -- should I update to the latest version?

Thanks,

Le

On Wed, Mar 6, 2019 at 4:24 AM Till Rohrmann  wrote:

> Which version of Flink are you using?
>
> On Tue, Mar 5, 2019 at 10:58 PM Le Xu  wrote:
>
>> Hi Till:
>>
>> Thanks for the reply. The setup of the jobs is roughly as follows: For a
>> cluster with N machines, we deploy X simple map/reduce style jobs (the job
>> DAG and settings are exactly the same, except they consumes different
>> data). Each job has N mappers (they are evenly distributed, one mapper on
>> each machine).There are X mappers on each machine (as there are X jobs in
>> total). Each job has only one reducer where all mappers point to. What I'm
>> observing is that all reducers are allocated to machine 1 (where all mapper
>> 1 from every job is allocated to).  It does make sense since reducer and
>> mapper 1 are in the same slot group. The original purpose of the questions
>> is to find out whether it is possible to explicitly specify that reducer
>> can be co-located with another mapper (such as mapper 2 so the reducer of
>> job 2 can be placed on machine 2). Just trying to figure out if it is all
>> possible without using more expensive approach (through YARN for example).
>> But if it is not possible I will see if I can move to job mode as Piotr
>> suggests.
>>
>> Thanks,
>>
>> Le
>>
>> On Tue, Mar 5, 2019 at 9:24 AM Till Rohrmann 
>> wrote:
>>
>>> Hard to tell whether this is related to FLINK-11815.
>>>
>>> To me the setup is not fully clear. Let me try to sum it up: According
>>> to Le Xu's description there are n jobs running on a session cluster. I
>>> assume that every TaskManager has n slots. The observed behaviour is that
>>> every job allocates the slot for the first mapper and chained sink from the
>>> first TM, right? Since Flink does not give strict guarantees for the slot
>>> allocation this is possible, however it should be highly unlikely or at
>>> least change when re-executing the same setup. At the moment there is no
>>> functionality in place to control the task-slot assignment.
>>>
>>> Chaining only affects which task will be grouped together and executed
>>> by the same Task (being executed by the same thread). Separate tasks can
>>> still be executed in the same slot if they have the same slot sharing
>>> group. This means that there can be multiple threads running in each slot.
>>>
>>> For me it would be helpful to get more information about the actual job
>>> deployments.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Mar 5, 2019 at 12:00 PM Piotr Nowojski 
>>> wrote:
>>>
 Hi Le,

 As I wrote, you can try running Flink in job mode, which spawns
 separate clusters per each job.

 Till, is this issue covered by FLINK-11815
  ? Is this the same
 as:

 > Known issues:
 > 1. (…)
 > 2. if task slots are registered before slot request, the code have a
 tendency to group requests together on the same machine because we
 are using a LinkedHashMap

 ?

 Piotrek

 On 4 Mar 2019, at 21:08, Le Xu  wrote:

 Thanks Piotr.

 I didn't realize that the email attachment isn't working so the example
 I was referring to was this figure from Flink website:
 https://ci.apache.org/projects/flink/flink-docs-stable/fig/slot_sharing.svg

 So I try to run multiple jobs concurrently in a cluster -- the jobs are
 identical and the DAG looks very similar to the one in the figure. Each
 machine holds one map task from each job. I end up with X number of sinks
 on machine 1 (X being the number of jobs). I assume this is caused by the
 operator chaining (so that all sinks are chained to mapper 1 all end up on
 machine 1). But I also tried disabling chaining but I still get the same
 result. Some how even when the sink and the map belongs to different
 threads they are still placed in the same slot.

 My goal was to see whether it is possible to have sinks evenly
 distributed across the cluster (instead of all on machine 1). One way to do
 this is to see if it is ok to chained the sink to one of the other mapper
 -- the other way is to see if we can change the placement of the mapper
 altogether (like placing map 1 of job 2 on machine 2, map 1 of job 3 on
 machine 3 so we end up with sinks sit evenly throughout the cluster).

 Thanks.

 Le

 On Mon, Mar 4, 2019 at 6:49 AM Piotr Nowojski 
 wrote:

> Hi,
>
> Are you asking the question if that’s the behaviour or you have
> actually observed this issue? I’m not entirely sure, but I would guess 
> that
> the Sink tasks would be distributed randomly across the cluster, but maybe
> I’m mixing this issue with resource allocations for Task Managers. Maybe
> Till will know something more about this?
>
> One thing that might have solve/workaround the issue is to run 

Re: Job continuously failing after Checkpoint Restore

2019-03-06 Thread Yun Tang
Hi Laura

>From the exception stack, there exist two possible reasons causing this NPE. 
>Either the KafkaTopicPartition is null or field topic of that 
>KafkaTopicPartition form the union state is null. No matter what reason, the 
>problem might existed in the KryoSerializer which used to de/serialize the 
>KafkaTopicPartition class. Gordon (in CC) who is expert at serialization might 
>offer more insights.

Before further discussion, would you please offer more information:

  1.  what version of Kafka did you use?
  2.  Did you ever meet this problem ever?
  3.  Have you ever changed anything before resuming your job?
  4.  If trying to restore checkpoint-60 again by submitting another job, will 
you also meet this NPE continuously again?

Best
Yun Tang

From: Laura Uzcátegui 
Sent: Wednesday, March 6, 2019 21:35
To: user
Subject: Job continuously failing after Checkpoint Restore

Hi,

We are currently running a Flink Job that has 3 operators.

Source ---> Filter ---> Sink

As soon as the job is started it tries to recover from the latest Checkpoint


[05-Mar-2019 13:09:55.365 UTC] INFO  Restoring from 
latest valid checkpoint: Checkpoint 60 @ 1551788864502 for 
fd697c91437216e773bb862cbae56e0f.


Then under operators initialization, specifically Source operator which reads 
from Kafka topics using a regex pattern, the job starts to fail with the 
following exception:


[05-Mar-2019 13:10:11.756 UTC] INFO  Job Data Lake Ingestion 
(fd697c91437216e773bb862cbae56e0f) switched from state RUNNING to FAILING. 
java.lang.NullPointerException at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:126)
 at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:123)
 at java.util.TreeMap.put(Unknown Source) at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:724)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at 
java.lang.Thread.run(Unknown Source)


I was wondering if anyone has seen this before?


My assumption will be


We are currently running with the following settings:


  *   Flink version: 1.4.2
  *   Docker image with the job embedded
  *   Job Parallelism : 8

Cheers,

Laura


Re: [1.7.1] job stuck in suspended state

2019-03-06 Thread Till Rohrmann
Hi Steven,

I think I found the problem. It is caused by a JobMaster which takes a long
time to suspend the job and multiple leader changes. So what happens after
the first leadership revoking and regaining is that the Dispatcher recovers
the submitted job but waits to execute it because the JobMaster from the
previous leader session has not been stopped yet. This is done by
scheduling some futures with the current main thread executor. Now what
happens next is another cycle of leadership revoking and regaining (3rd
time that the Dispatcher receives the leadership). Due to this, we create a
new main thread executor and invalidate the previous one. As a consequence
the created future from the 2nd leadership will never be executed. This
alone is not a problem but since we serialize the leadership gaining
operations, we actually wait for this future to complete before starting
the job recovery belonging to the 3rd leadership.

Long story short: FLINK-11537 should make this problem very unlikely to
occur because the JobMaster termination is now almost instantaneous.
However it does not fully fix the underlying problem. Here is the JIRA
issue for fixing the problem:
https://issues.apache.org/jira/browse/FLINK-11843

Cheers,
Till

On Wed, Mar 6, 2019 at 1:21 PM Till Rohrmann  wrote:

> Hi Steven,
>
> a quick update from my side after looking through the logs. The problem
> seems to be that the Dispatcher does not start recovering the jobs after
> regaining the leadership after it lost it before. I cannot yet tell why
> this is happening and I try to further debug the problem.
>
> If you manage to reproduce the problem, could you maybe run the cluster
> with DEBUG log levels and send me again the logs? If I don't manage to
> figure out how this problem happens until then, it would be super helpful.
>
> Cheers,
> Till
>
> On Mon, Mar 4, 2019 at 7:44 PM Steven Wu  wrote:
>
>> Till,
>>
>> I will send you the complete log offline. We don't know how to reliably
>> reproduce the problem. but it did happen quite frequently, like once every
>> a couple of days. Let me see if I can cherry pick the fix/commit to 1.7
>> branch.
>>
>> Thanks,
>> Steven
>>
>>
>> On Mon, Mar 4, 2019 at 5:55 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Steven,
>>>
>>> is this the tail of the logs or are there other statements following? I
>>> think your problem could indeed be related to FLINK-11537. Is it possible
>>> to somehow reliably reproduce this problem? If yes, then you could try out
>>> the RC for Flink 1.8.0 which should be published in the next days.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sat, Mar 2, 2019 at 12:47 AM Steven Wu  wrote:
>>>
 We have observe that sometimes job stuck in suspended state, and no job
 restart/recover were attempted once job is suspended.
 * it is a high-parallelism job (like close to 2,000)
 * there were a few job restarts before this
 * there were high GC pause during the period
 * zookeeper timeout. probably caused by high GC pause

 Is it related to https://issues.apache.org/jira/browse/FLINK-11537?

 I pasted some logs in the end.

 Thanks,
 Steven

 2019-02-28 19:04:36,357 WARN
 org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL
 configuration failed: javax.security.auth.login.LoginException: No JAAS
 configuration section named 'Client' was found in speci
 fied JAAS configuration file: '/tmp/jaas-6664341082794720643.conf'.
 Will continue connection to Zookeeper server without SASL authentication,
 if Zookeeper server allows it.
 2019-02-28 19:04:36,357 INFO
 org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
 Opening socket connection to server 100.82.141.106/100.82.141.106:2181
 2019-02-28 19:04:36,357 ERROR
 org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
 Authentication failed
 2019-02-28 19:04:36,357 INFO
 org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket
 connection established to 100.82.141.106/100.82.141.106:2181,
 initiating session
 2019-02-28 19:04:36,359 INFO
 org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
 Session establishment complete on server
 100.82.141.106/100.82.141.106:2181, sessionid = 0x365ef9c4fe7f1f2,
 negotiated timeout = 4
 2019-02-28 19:04:36,359 INFO
 org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
 - State change: RECONNECTED
 2019-02-28 19:04:36,359 INFO
 org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
 Connection to ZooKeeper was reconnected. Leader election can be restarted.
 2019-02-28 19:04:36,359 INFO
 org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
 Connection to ZooKeeper was reconnected. Leader election can be restarted.
 2019-02-28 19:04:36,359 INFO
 

Joining two streams of different priorities

2019-03-06 Thread Aggarwal, Ajay
My main input stream (inputStream1) gets processed using a pipeline that looks 
like below

inputStream1
.keyBy("some-key")

.window(TumblingEventTimeWindows.of(Time.seconds(Properties.WINDOW_SIZE)))
.process(new MyProcessWindowFunction());


Note that for each key only place I want main processing to happen is in the 
above ProcessWindowFunction. That’s because I have a requirement to process 
events associated with a key serially (one at a time) and in order by eventTime.

However I have 2 sources of events that I need to join:

  *   inputStream1 : normal priority events (using eventTime)
  *   inputStream2:  higher priority events

I somehow need to join these 2 streams, so all processing per key is still 
happening in same ProcessWindowFunction. But I want to treat events from 
inputStream2 with higher priority. So if pipeline is backed up a little with 
events from inputStream1  and a new event shows up in inputStream2, I want that 
event to make it to ProcessWindowFunction faster.  Is there any way to make 
that happen? Window Join will not give me the desired behavior since it will 
join elements of two streams that lie in same window.

Thanks!

Ajay




Re: Problems with restoring from savepoint

2019-03-06 Thread Павел Поцелуев
Thanks. We'll try it with 1.8.0 and let you know. ---Best regards,Pavel PotseluevSoftware developer, Yandex.Classifieds LLC 06.03.2019, 16:44, "Tzu-Li (Gordon) Tai" :Hi Pavel,As you already discovered, this problem occurs still because in 1.7.x, the KryoSerializer is still using the deprecated TypeSerializerConfigSnapshot as its snapshot, which relies on the serializer being Java-serialized into savepoints as state metadata.In 1.8.0, all Flink's built-in serializers, including the KryoSerializer, have been upgraded to use the new abstraction (i.e. TypeSerializerSnapshot), which doesn't rely on Java serialization anymore.So, essentially, you won't bump into this problem anymore after upgrading to the upcoming 1.8.0.Please note that this problem only fully goes away once you have a savepoint taken with 1.8.0. When restoring from a 1.7.1 savepoint (or any version earlier than 1.8.0), Java-deserialization of the serializer still occurs, so you will need to keep that workaround of adding the serialVersionUID around until you fully upgrade to 1.8.0 savepoints.I think the first release candidate for 1.8.0 will be available soon.Would be interesting if you can try that out and let me know how this works out for you with the release candidate!Cheers,Gordon On Wed, Mar 6, 2019 at 5:06 PM Pavel Potseluev  wrote:Hi! We use flink-1.7.1 and have some problems with restoring from savepoint. We use custom kryo serializer which relies on protobuf representation of our model classes. It had been working fine but when we made some change in our model class it broke because of changed serialVersionUID. We can see this message in the log:   Caused by: java.lang.IllegalStateException: Could not Java-deserialize TypeSerializer while restoring checkpoint metadata for serializer snapshot 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'. Please update to the TypeSerializerSnapshot interface that removes Java Serialization to avoid this problem in the future. I found that method snapshotConfiguration of org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer returns instance of KryoSerializerConfigSnapshot. And this class for some reason extends deprecated TypeSerializerConfigSnapshot which relies on java serialization.  Of course we have fixed our problem just by adding special serialVersionUID to our class. But it seems strange to have problems with java serialization while our serializer doesn't use this mechanism. Do you plan to fix this problem? Full stack trace below: java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for StreamGroupedReduce_5d41c2bc0b6f18591a40bd21a3e516cd_(2/2) from any of the 1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
	... 5 more
Caused by: java.lang.IllegalStateException: Could not Java-deserialize TypeSerializer while restoring checkpoint metadata for serializer snapshot 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'. Please update to the TypeSerializerSnapshot interface that removes Java Serialization to avoid this problem in the future.
	at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.restoreSerializer(TypeSerializerConfigSnapshot.java:138)
	at org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:212)
	at org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:188)
	at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:135)
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.getStateSerializer(CopyOnWriteStateTable.java:541)
	at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.createV2PlusReader(StateTableByKeyGroupReaders.java:69)
	at 

Re: Flink 在什么情况下产生乱序问题?

2019-03-06 Thread 刘 文
).在验证EventTime 加watermark 处理中,我发现往socket发送的数据,不能及时输出或没有输出
).验证发现,只有当前发送的数据的 getCurrentWatermark()的时间戳 > TimeWindow + maxOutOfOrderness 
时,才会触发结束上一次window
).可是最新的记录是不能及时被处理,或者是不能被处理
).请问这个问题怎么处理?









---

> 在 2019年3月6日,下午10:29,刘 文  写道:
> 
> 该问题,明白一点了,整理成文档供大家参考
> ———
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析: 
> https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md
>  
> 
> 
> 
> ———
> 
> 
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析
> 
>  
> 源码
> 
> https://github.com/opensourceteams/flink-maven-scala 
> 
>  
> 概述
> 
> 由于Flink默认的ProcessTime是按Window收到Source发射过来的数据的时间,来算了,也就是按Flink程序接收的时间来进行计算,但实际业务,处理周期性的数据时,每5分钟内的数据,每1个小时内的数据进行分析,实际是业务源发生的时间来做为实际时间,所以用Flink的EventTime和Watermark来处理这个问题
> 指定Env为EventTime
> 调置数据流assignTimestampsAndWatermarks函数,由AssignerWithPeriodicWatermarks中的extractTimestamp()函数提取实际业务时间,getCurrentWatermark得到最新的时间,这个会对每个元素算一次,拿最大的当做计算时间,如果当前时间,大于上一次的时间间隔
>  + 这里设置的延时时间,就会结束上一个Window,也就是对这一段时间的Window进行操作
> 本程序以指定业务时间,来做为统计时间
>  
> 程序
> 
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
> 
> import java.util.{Date, Properties}
> 
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
> 
> 
> object SockWordCountRun {
> 
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> 
> // get the execution environment
>// val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
> val configuration : Configuration = 
> ConfigurationUtil.getConfiguration(true)
> 
> val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
> 
> import org.apache.flink.streaming.api.scala._
> val dataStream = env.socketTextStream("localhost", 1234, '\n')
> 
>  // .setParallelism(3)
> 
> 
> dataStream.assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[String] {
> 
> val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
> var currentMaxTimestamp: Long = _
> var currentTimestamp: Long = _
> 
> override def getCurrentWatermark: Watermark =  new 
> Watermark(currentMaxTimestamp - maxOutOfOrderness)
> 
> override def extractTimestamp(element: String, 
> previousElementTimestamp: Long): Long = {
>   val jsonObject = JSON.parseObject(element)
> 
>   val timestamp = jsonObject.getLongValue("extract_data_time")
>   currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>   currentTimestamp = timestamp
> 
> /*  println("===watermark begin===")
>   println()
>   println(new Date(currentMaxTimestamp - 20 * 1000))
>   println(jsonObject)
>   println("===watermark end===")
>   println()*/
>   timestamp
> }
> 
>   })
>   .timeWindowAll(Time.seconds(3))
> 
>   .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>   override def process(context: Context, elements: Iterable[String], out: 
> Collector[String]): Unit = {
> 
> 
> println()
> println("开始提交window")
> println(new Date())
> for(e <- elements) out.collect(e)
> println("结束提交window")
> println(new Date())
> println()
>   }
> })
> 
>   .print()
>   //.setParallelism(3)
> 
> 
> 
> 
> 
> 
> 

DataStream EventTime last data cannot be output?

2019-03-06 Thread 刘 文
DataStream EventTime last data cannot be output ?


In the verification of EventTime plus watermark processing, I found that the 
data sent to the socket cannot be output in time or output.
). The verification found that only the timestamp of the current send data of 
getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the end of 
the last window
). But the latest record can not be processed in time, or can not be processed
). How can I deal with this problem?



The following is the Flink program ,Flink 1.7.2
---



package 
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime

import java.util.{Date, Properties}

import com.alibaba.fastjson.JSON
import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector


object SockWordCountRun {



  def main(args: Array[String]): Unit = {


// get the execution environment
   // val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment


val configuration : Configuration = ConfigurationUtil.getConfiguration(true)

val env:StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)



import org.apache.flink.streaming.api.scala._
val dataStream = env.socketTextStream("localhost", 1234, '\n')

 // .setParallelism(3)


dataStream.assignTimestampsAndWatermarks(new 
AssignerWithPeriodicWatermarks[String] {

val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
var currentMaxTimestamp: Long = _
var currentTimestamp: Long = _

override def getCurrentWatermark: Watermark =  new 
Watermark(currentMaxTimestamp - maxOutOfOrderness)

override def extractTimestamp(element: String, 
previousElementTimestamp: Long): Long = {
  val jsonObject = JSON.parseObject(element)

  val timestamp = jsonObject.getLongValue("extract_data_time")
  currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
  currentTimestamp = timestamp

/*  println("===watermark begin===")
  println()
  println(new Date(currentMaxTimestamp - 20 * 1000))
  println(jsonObject)
  println("===watermark end===")
  println()*/
  timestamp
}

  })
  .timeWindowAll(Time.seconds(3))

  .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
  override def process(context: Context, elements: Iterable[String], out: 
Collector[String]): Unit = {


println()
println("开始提交window")
println(new Date())
for(e <- elements) out.collect(e)
println("结束提交window")
println(new Date())
println()
  }
})

  .print()
  //.setParallelism(3)






println("==以下为执行计划==")
println("执行地址(firefox效果更好):https://flink.apache.org/visualizer;)
//执行计划
println(env.getStreamGraph.getStreamingPlanAsJSON)
println("==以上为执行计划 
JSON串==\n")


env.execute("Socket 水印作业")






println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long){
//override def toString: String = Thread.currentThread().getName + word + " 
: " + count
  }


  def getConfiguration(isDebug:Boolean = false):Configuration = {

val configuration : Configuration = new Configuration()

if(isDebug){
  val timeout = "10 s"
  val timeoutHeartbeatPause = "100 s"
  configuration.setString("akka.ask.timeout",timeout)
  configuration.setString("akka.lookup.timeout",timeout)
  configuration.setString("akka.tcp.timeout",timeout)
  configuration.setString("akka.transport.heartbeat.interval",timeout)
  
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
  configuration.setString("akka.watch.heartbeat.pause",timeout)
  configuration.setInteger("heartbeat.interval",1000)
  configuration.setInteger("heartbeat.timeout",5000)
}


configuration
  }


}





best   

Re: Broadcast state with WindowedStream

2019-03-06 Thread Aggarwal, Ajay
Still looking for ideas as to how I can use broadcast state in my use case.

From: "Aggarwal, Ajay" 
Date: Monday, March 4, 2019 at 4:52 PM
To: "user@flink.apache.org" 
Subject: Re: Broadcast state with WindowedStream

It sort of makes sense that broadcast state is not available with 
WindowedStream. But if I need some dynamic global state in 
MyProcessWindowFunction  what are my options?

Ajay

From: "Aggarwal, Ajay" 
Date: Monday, March 4, 2019 at 4:36 PM
To: "user@flink.apache.org" 
Subject: Broadcast state with WindowedStream


Is it possible to use broadcast state with windowing? My job looks like below

inputStream
.keyBy("some-key")

.window(TumblingEventTimeWindows.of(Time.seconds(Properties.WINDOW_SIZE)))
.process(new MyProcessWindowFunction());

I wanted to introduce broadcast state that MyProcessWindowFunction can make use 
of.  The example here 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html)
 covers connecting a Keyed and non-Keyed stream with the BroadcastStream. But 
in my case I need to connect WindowedStream with BroadcastStream, which isn’t 
seem to be supported.

Ajay


4 Apache Events in 2019: DC Roadshow soon; next up Chicago, Las Vegas, and Berlin!

2019-03-06 Thread Rich Bowen
Dear Apache Enthusiast,

(You’re receiving this because you are subscribed to one or more user
mailing lists for an Apache Software Foundation project.)

TL;DR:
 * Apache Roadshow DC is in 3 weeks. Register now at
https://apachecon.com/usroadshowdc19/
 * Registration for Apache Roadshow Chicago is open.
http://apachecon.com/chiroadshow19
 * The CFP for ApacheCon North America is now open.
https://apachecon.com/acna19
 * Save the date: ApacheCon Europe will be held in Berlin, October 22nd
through 24th.  https://apachecon.com/aceu19


Registration is open for two Apache Roadshows; these are smaller events
with a more focused program and regional community engagement:

Our Roadshow event in Washington DC takes place in under three weeks, on
March 25th. We’ll be hosting a day-long event at the Fairfax campus of
George Mason University. The roadshow is a full day of technical talks
(two tracks) and an open source job fair featuring AWS, Bloomberg, dito,
GridGain, Linode, and Security University. More details about the
program, the job fair, and to register, visit
https://apachecon.com/usroadshowdc19/

Apache Roadshow Chicago will be held May 13-14th at a number of venues
in Chicago’s Logan Square neighborhood. This event will feature sessions
in AdTech, FinTech and Insurance, startups, “Made in Chicago”, Project
Shark Tank (innovations from the Apache Incubator), community diversity,
and more. It’s a great way to learn about various Apache projects “at
work” while playing at a brewery, a beercade, and a neighborhood bar.
Sign up today at https://www.apachecon.com/chiroadshow19/

We’re delighted to announce that the Call for Presentations (CFP) is now
open for ApacheCon North America in Las Vegas, September 9-13th! As the
official conference series of the ASF, ApacheCon North America will
feature over a dozen Apache project summits, including Cassandra,
Cloudstack, Tomcat, Traffic Control, and more. We’re looking for talks
in a wide variety of categories -- anything related to ASF projects and
the Apache development process. The CFP closes at midnight on May 26th.
In addition, the ASF will be celebrating its 20th Anniversary during the
event. For more details and to submit a proposal for the CFP, visit
https://apachecon.com/acna19/ . Registration will be opening soon.

Be sure to mark your calendars for ApacheCon Europe, which will be held
in Berlin, October 22-24th at the KulturBrauerei, a landmark of Berlin's
industrial history. In addition to innovative content from our projects,
we are collaborating with the Open Source Design community
(https://opensourcedesign.net/) to offer a track on design this year.
The CFP and registration will open soon at https://apachecon.com/aceu19/ .

Sponsorship opportunities are available for all events, with details
listed on each event’s site at http://apachecon.com/.

We look forward to seeing you!

Rich, for the ApacheCon Planners
@apachecon



Re: Setting source vs sink vs window parallelism with data increase

2019-03-06 Thread Padarn Wilson
Thanks a lot for your suggestion. I’ll dig into it and update for the
mailing list if I find anything useful.

Padarn

On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski  wrote:

> Re-adding user mailing list.
>
>
> Hi,
>
> If it is a GC issue, only GC logs or some JVM memory profilers (like
> Oracle’s Mission Control) can lead you to the solution. Once you confirm
> that it’s a GC issue, there are numerous resources online how to analyse
> the cause of the problem. For that, it is difficult to use CPU
> profiling/Flink Metrics, since GC issues caused by one thread, can cause
> performance bottlenecks in other unrelated places.
>
> If that’s not a GC issue, you can use Flink metrics (like number of
> buffered input/output data) to find Task that’s causing a bottleneck. Then
> you can use CPU profiler to analyse why is that happening.
>
> Piotrek
>
> On 6 Mar 2019, at 02:52, Padarn Wilson  wrote:
>
> Hi Piotr,
>
> Thanks for your feedback. Makes sense about the checkpoint barriers - this
> definitely could be the cause of a problem.
>
> I would advice profiling your job to find out what’s going on.
>
>
> Agreed. Outside of inspecting the Flink metrics, do you have suggestions
> for tools with which to do this?
>
> The main thing I'm trying to pin down is:
> 1) Is it the downstream processing from the expansion of records that
> causes a problem, or
> 2) Is is the shuffle of all the records after the expansion which is
> taking a large time - if so, is there anything I can do to mitigate this
> other than trying to ensure less shuffle.
>
> Thanks,
> Padarn
>
>
> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski  wrote:
>
>> Hi,
>>
>> Do you mind elaborating on this? What technology would you propose as an
>> alternative, and why would this increase checkpointing time?
>>
>>
>> The problem is that when Flink starts checkpoint and inject checkpoint
>> barriers, those checkpoint barriers travel through the Job Graph. The
>> quicker they can do that the better. How fast does it take depends on the
>> amount of buffered data before checkpoint barriers (currently all of such
>> records must be processed before checkpoint barrier is passed down stream).
>> The more buffered records and the more time it takes to process those
>> records, the longer the checkpoint take time. Obviously if one stage in the
>> job is multiplying the amount of records, it can in a way multiply the
>> amount of “buffered work” that needs to be processed before checkpoint
>> barriers pass through.
>>
>> However it might not be the case for you. To analyse what’s going on you
>> would need to look at various Flink metrics, like checkpoint times, back
>> pressured tasks, state of the output/input buffers of the tasks, etc.
>> However #2, those are secondary issues. First of all you should try to pin
>> point the cause of long GC pauses. If it comes from your code, you should
>> fix this first. If that either isn’t the issue or doesn’t solve it,
>> generally speaking I would advice profiling your job to find out what’s
>> going on.
>>
>> Piotrek
>>
>> On 5 Mar 2019, at 02:00, Padarn Wilson  wrote:
>>
>> Hi Piotr,
>>
>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to
>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>>
>> Generally speaking Flink might not the best if you have records fan out,
>>> this may significantly increase checkpointing time.
>>
>>
>> Do you mind elaborating on this? What technology would you propose as an
>> alternative, and why would this increase checkpointing time?
>>
>> However you might want to first identify what’s causing long GC times.
>>>
>>
>> My current plan is to try and enable GC logs and see if I can get
>> something meaningful from them.
>>
>> Thanks a lot,
>> Padarn
>>
>>
>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> What Flink version are you using?
>>>
>>> Generally speaking Flink might not the best if you have records fan out,
>>> this may significantly increase checkpointing time.
>>>
>>> However you might want to first identify what’s causing long GC times.
>>> If there are long GC pause, this should be the first thing to fix.
>>>
>>> Piotrek
>>>
>>> On 2 Mar 2019, at 08:19, Padarn Wilson  wrote:
>>>
>>> Hi all again - following up on this I think I've identified my problem
>>> as being something else, but would appreciate if anyone can offer advice.
>>>
>>> After running my stream from sometime, I see that my garbage collector
>>> for old generation starts to take a very long time:
>>> 
>>> here the* purple line is young generation time*, this is ever
>>> increasing, but grows slowly, while the *blue is old generation*.
>>> This in itself is not a problem, but as soon as the next checkpoint is
>>> triggered after this happens you see the following:
>>> 
>>> It looks like the checkpoint hits a cap, but this is only because the
>>> checkpoints start to timeout and fail (these are the alignment time per
>>> operator)
>>>
>>> I do 

Re: Problems with restoring from savepoint

2019-03-06 Thread Tzu-Li (Gordon) Tai
Hi Pavel,

As you already discovered, this problem occurs still because in 1.7.x, the
KryoSerializer is still using the deprecated TypeSerializerConfigSnapshot
as its snapshot, which relies on the serializer being Java-serialized into
savepoints as state metadata.

In 1.8.0, all Flink's built-in serializers, including the KryoSerializer,
have been upgraded to use the new abstraction (i.e.
TypeSerializerSnapshot), which doesn't rely on Java serialization anymore.
So, essentially, you won't bump into this problem anymore after upgrading
to the upcoming 1.8.0.

Please note that this problem only fully goes away once you have a
savepoint taken with 1.8.0. When restoring from a 1.7.1 savepoint (or any
version earlier than 1.8.0), Java-deserialization of the serializer still
occurs, so you will need to keep that workaround of adding the
serialVersionUID around until you fully upgrade to 1.8.0 savepoints.

I think the first release candidate for 1.8.0 will be available soon.
Would be interesting if you can try that out and let me know how this works
out for you with the release candidate!

Cheers,
Gordon

On Wed, Mar 6, 2019 at 5:06 PM Pavel Potseluev 
wrote:

> Hi!
>
> We use flink-1.7.1 and have some problems with restoring from savepoint.
> We use custom kryo serializer which relies on protobuf representation of
> our model classes. It had been working fine but when we made some change in
> our model class it broke because of changed serialVersionUID. We can see
> this message in the log:
>
>
> Caused by: java.lang.IllegalStateException: Could not Java-deserialize
> TypeSerializer while restoring checkpoint metadata for serializer snapshot
> 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'.
> Please update to the TypeSerializerSnapshot interface that removes Java
> Serialization to avoid this problem in the future.
>
>
> I found that method *snapshotConfiguration* of
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer returns
> instance of KryoSerializerConfigSnapshot. And this class for some reason
> extends deprecated TypeSerializerConfigSnapshot which relies on java
> serialization.
>
>
> Of course we have fixed our problem just by adding special
> serialVersionUID to our class. But it seems strange to have problems with
> java serialization while our serializer doesn't use this mechanism. Do you
> plan to fix this problem?
>
> Full stack trace below:
>
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for StreamGroupedReduce_5d41c2bc0b6f18591a40bd21a3e516cd_(2/2) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
>   ... 5 more
> Caused by: java.lang.IllegalStateException: Could not Java-deserialize 
> TypeSerializer while restoring checkpoint metadata for serializer snapshot 
> 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'.
>  Please update to the TypeSerializerSnapshot interface that removes Java 
> Serialization to avoid this problem in the future.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.restoreSerializer(TypeSerializerConfigSnapshot.java:138)
>   at 
> org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:212)
>   at 
> org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:188)
>   at 
> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:135)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.getStateSerializer(CopyOnWriteStateTable.java:541)
>   at 
> 

Re: Using Flink in an university course

2019-03-06 Thread Wouter Zorgdrager
Hi all,

Thanks for the input. Much appreciated.

Regards,
Wouter

Op ma 4 mrt. 2019 om 20:40 schreef Addison Higham :

> Hi there,
>
> As far as a runtime for students, it seems like docker is your best bet.
> However, you could have them instead package a jar using some interface
> (for example, see
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/packaging.html,
> which details the `Program` interface) and then execute it inside a custom
> runner. That *might* result in something less prone to breakage as it would
> need to conform to an interface, but it may require a fair amount of custom
> code to reduce the boiler plate to build up a program plan as well as the
> custom runner. The code for how flink loads a jar and turns it into
> something it can execute is mostly encapsulated
> in org.apache.flink.client.program.PackagedProgram, which might be a good
> thing to read and understand if you go down this route.
>
> If you want to give more insight, you could build some tooling to traverse
> the underlying graphs that the students build up in their data stream
> application. For example, calling
> `StreamExecutionEnvironment.getStreamGraph` after the data stream is built
> will get a graph of the current job, which you can then use to traverse a
> graph and see which operators and edges are in use. This is very similar to
> the process flink uses to build the job DAG it renders in the UI. I am not
> sure what you could do as an automated analysis, but the StreamGraph API is
> quite low level and exposes a lot of information about the program.
>
> Hopefully that is a little bit helpful. Good luck and sounds like a fun
> course!
>
>
> On Mon, Mar 4, 2019 at 7:16 AM Wouter Zorgdrager <
> w.d.zorgdra...@tudelft.nl> wrote:
>
>> Hey all,
>>
>> Thanks for the replies. The issues we were running into (which are not
>> specific to Docker):
>> - Students changing the template wrongly failed the container.
>> - We give full points if the output matches our solutions (and none
>> otherwise), but it would be nice if we could give partial grades per
>> assignment (and better feedback). This would require instead of looking
>> only at results also at the operators used. The pitfall is that in many
>> cases a correct solution can be achieved in multiple ways. I came across a
>> Flink test library [1] which allows to test Flink code more extensively but
>> seems to be only in Java.
>>
>> In retrospective, I do think using Docker is a good approach as Fabian
>> confirms. However, the way we currently assess student solutions might be
>> improved. I assume that in your trainings manual feedback is given, but
>> unfortunately this is quite difficult for so many students.
>>
>> Cheers,
>> Wouter
>>
>> 1: https://github.com/ottogroup/flink-spector
>>
>>
>> Op ma 4 mrt. 2019 om 14:39 schreef Fabian Hueske :
>>
>>> Hi Wouter,
>>>
>>> We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper)
>>> setups for our trainings and it is working very well.
>>> We have an additional container that feeds a Kafka topic via the
>>> commandline producer to simulate a somewhat realistic behavior.
>>> Of course, you can do it without Kafka as and use some kind of data
>>> generating source that reads from a file that is replace for evaluation.
>>>
>>> The biggest benefit that I see with using Docker is that the students
>>> have an environment that is close to grading situation for development and
>>> testing.
>>> You do not need to provide infrastructure but everyone is running it
>>> locally in a well-defined context.
>>>
>>> So, as Joern said, what problems do you see with Docker?
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Mo., 4. März 2019 um 13:44 Uhr schrieb Jörn Franke <
>>> jornfra...@gmail.com>:
>>>
 It would help to understand the current issues that you have with this
 approach? I used a similar approach (not with Flink, but a similar big data
 technology) some years ago

 > Am 04.03.2019 um 11:32 schrieb Wouter Zorgdrager <
 w.d.zorgdra...@tudelft.nl>:
 >
 > Hi all,
 >
 > I'm working on a setup to use Apache Flink in an assignment for a Big
 Data (bachelor) university course and I'm interested in your view on this.
 To sketch the situation:
 > -  > 200 students follow this course
 > - students have to write some (simple) Flink applications using the
 DataStream API; the focus is on writing the transformation code
 > - students need to write Scala code
 > - we provide a dataset and a template (Scala class) with function
 signatures and detailed description per application.
 > e.g.: def assignment_one(input: DataStream[Event]):
 DataStream[(String, Int)] = ???
 > - we provide some setup code like parsing of data and setting up the
 streaming environment
 > - assignments need to be auto-graded, based on correct results
 >
 > In last years course edition we approached this by a custom Docker
 container. This 

Re: [DISCUSS] Create a Flink ecosystem website

2019-03-06 Thread Robert Metzger
Awesome! Thanks a lot for looking into this Becket! The VMs hosted by Infra
look suitable.

@Shaoxuan: There is actually already a static page. It used to be linked,
but has been removed from the navigation bar for some reason. This is the
page: https://flink.apache.org/ecosystem.html
We could update the page and add it back to the navigation bar for the
coming weeks. What do you think?

I would actually like to push for a dynamic page right away.

I know it's kind of a bold move, but how do you feel about sending the
owners of spark-packages.org a short note, if they are interested in
sharing the source? We could maintain the code together in a public repo.
If they are not interested in sharing, or we decide not to ask in the first
place, I'm happy to write down a short description of the requirements,
maybe some mockups. We could then see if we find somebody here in the
community who's willing to implement it.
Given the number of people who are eager to contribute, I believe we will
be able to find somebody pretty soon.


On Wed, Mar 6, 2019 at 3:49 AM Becket Qin  wrote:

> Forgot to provide the link...
>
> [1] https://www.apache.org/dev/services.html#blogs (Apache infra services)
> [2] https://www.apache.org/dev/freebsd-jails (FreeBSD Jail provided by
> Apache Infra)
>
> On Wed, Mar 6, 2019 at 10:46 AM Becket Qin  wrote:
>
>> Hi Robert,
>>
>> Thanks for the feedback. These are good points. We should absolutely
>> shoot for a dynamic website to support more interactions in the community.
>> There might be a few things to solve:
>> 1. The website code itself. An open source solution would be great. TBH,
>> I do not have much experience on building a website. It'll be great if
>> someone could help comment on the solution here.
>> 2. The hardware to host the website. Apache Infra provides a few
>> services[1] that Apache projects can leverage. I did not see database
>> service, but maybe we can run a simple MySQL db in FreeBSD jail[2].
>>
>> @Bowen & vino, thanks for the positive feedback!
>>
>> @Shaoxuan Wang 
>> Thanks for the suggestion. That sounds reasonable to me. We probably need
>> a page in the Flink official site anyways, even just provide links it to
>> the ecosystem website. So listing the connectors in that static page seems
>> something we could start with while we are working on the dynamic pages.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, Mar 6, 2019 at 10:40 AM Shaoxuan Wang 
>> wrote:
>>
>>> Hi Becket and Robert,
>>>
>>> I like this idea!  Let us roll this out with Flink connectors at the
>>> first beginning. We can start with a static page, and upgrade it when we
>>> find a better solution for dynamic one with rich functions.
>>>
>>> Regards,
>>> Shaoxuan
>>>
>>>
>>> On Wed, Mar 6, 2019 at 1:36 AM Robert Metzger 
>>> wrote:
>>>
 Hey Becket,

 This is a great idea!
 For this to be successful, we need to make sure the page is placed
 prominently so that the people submitting something will get attention for
 their contributions.
 I think a dynamic site would probably be better, if we want features
 such as up and downvoting or comments.
 I would also like this to be hosted on Apache infra, and endorsed by
 the community.

 Does anybody here know any existing software that we could use?
 The only think I was able to find is AUR: https://aur.archlinux.org/
 (which is a community packages site for Arch Linux. The source code of this
 portal is open source, but the layout and structure is not an ideal fit for
 our requirements)

 Best,
 Robert



 On Tue, Mar 5, 2019 at 12:03 PM Becket Qin 
 wrote:

> Hi folks,
>
> I would like to start a discussion thread about creating a Flink
> ecosystem website. The website aims to help contributors who have 
> developed
> projects around Flink share their work with the community.
>
> Please see the following doc for more details.
>
> https://docs.google.com/document/d/12oCItoLbKrLGuwEUFcCfigezIR2hW3925j1hh3kGp4A/edit#
>
> Thanks,
>
> Jiangjie (Becket) Qin
>



Re: How to check validity or completeness of created checkpoint/savepoint

2019-03-06 Thread Chesnay Schepler
The existence of a _metadata file is a good indicator that Flink has 
finished writing the checkpoint/savepoint; IIRC we use this in our 
tests. I'm not aware of any other mechanism.


On 06.03.2019 10:21, Parth Sarathy wrote:

Hi,
   I am running flink 1.7.2 and working on resuming a job from a retained
checkpoint / savepoint.
I want to enquire if there is any reliable method which can be used to know
the validity or completeness of the checkpoint / savepoint created by flink.

Thanks,
Parth Sarathy



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: [1.7.1] job stuck in suspended state

2019-03-06 Thread Till Rohrmann
Hi Steven,

a quick update from my side after looking through the logs. The problem
seems to be that the Dispatcher does not start recovering the jobs after
regaining the leadership after it lost it before. I cannot yet tell why
this is happening and I try to further debug the problem.

If you manage to reproduce the problem, could you maybe run the cluster
with DEBUG log levels and send me again the logs? If I don't manage to
figure out how this problem happens until then, it would be super helpful.

Cheers,
Till

On Mon, Mar 4, 2019 at 7:44 PM Steven Wu  wrote:

> Till,
>
> I will send you the complete log offline. We don't know how to reliably
> reproduce the problem. but it did happen quite frequently, like once every
> a couple of days. Let me see if I can cherry pick the fix/commit to 1.7
> branch.
>
> Thanks,
> Steven
>
>
> On Mon, Mar 4, 2019 at 5:55 AM Till Rohrmann  wrote:
>
>> Hi Steven,
>>
>> is this the tail of the logs or are there other statements following? I
>> think your problem could indeed be related to FLINK-11537. Is it possible
>> to somehow reliably reproduce this problem? If yes, then you could try out
>> the RC for Flink 1.8.0 which should be published in the next days.
>>
>> Cheers,
>> Till
>>
>> On Sat, Mar 2, 2019 at 12:47 AM Steven Wu  wrote:
>>
>>> We have observe that sometimes job stuck in suspended state, and no job
>>> restart/recover were attempted once job is suspended.
>>> * it is a high-parallelism job (like close to 2,000)
>>> * there were a few job restarts before this
>>> * there were high GC pause during the period
>>> * zookeeper timeout. probably caused by high GC pause
>>>
>>> Is it related to https://issues.apache.org/jira/browse/FLINK-11537?
>>>
>>> I pasted some logs in the end.
>>>
>>> Thanks,
>>> Steven
>>>
>>> 2019-02-28 19:04:36,357 WARN
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL
>>> configuration failed: javax.security.auth.login.LoginException: No JAAS
>>> configuration section named 'Client' was found in speci
>>> fied JAAS configuration file: '/tmp/jaas-6664341082794720643.conf'. Will
>>> continue connection to Zookeeper server without SASL authentication, if
>>> Zookeeper server allows it.
>>> 2019-02-28 19:04:36,357 INFO
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>>> Opening socket connection to server 100.82.141.106/100.82.141.106:2181
>>> 2019-02-28 19:04:36,357 ERROR
>>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  -
>>> Authentication failed
>>> 2019-02-28 19:04:36,357 INFO
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket
>>> connection established to 100.82.141.106/100.82.141.106:2181,
>>> initiating session
>>> 2019-02-28 19:04:36,359 INFO
>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>>> Session establishment complete on server
>>> 100.82.141.106/100.82.141.106:2181, sessionid = 0x365ef9c4fe7f1f2,
>>> negotiated timeout = 4
>>> 2019-02-28 19:04:36,359 INFO
>>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>>> - State change: RECONNECTED
>>> 2019-02-28 19:04:36,359 INFO
>>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>>> Connection to ZooKeeper was reconnected. Leader election can be restarted.
>>> 2019-02-28 19:04:36,359 INFO
>>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>>> Connection to ZooKeeper was reconnected. Leader election can be restarted.
>>> 2019-02-28 19:04:36,359 INFO
>>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>>> Connection to ZooKeeper was reconnected. Leader election can be restarted.
>>> 2019-02-28 19:04:36,359 INFO
>>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  -
>>> Connection to ZooKeeper was reconnected. Leader election can be restarted.
>>> 2019-02-28 19:04:36,359 INFO
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
>>> 2019-02-28 19:04:36,359 INFO
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
>>> 2019-02-28 19:04:36,359 INFO
>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>> ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are
>>> monitored again.
>>> 2019-02-28 19:04:36,360 INFO
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> Connection to ZooKeeper was reconnected. Leader retrieval can be restarte
>>> ...
>>> 2019-02-28 19:05:09,400 INFO
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>> 2019-02-28 19:05:09,400 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> 

Re: RMQSource synchronous message ack

2019-03-06 Thread Chesnay Schepler
The acknowledgement has to be synchronous since Flink assume that after 
notifyCheckpointComplete() all data has been persisted to external 
systems. For example, if record 1 to 100 were passed to the sink and a 
checkpoint occurs and completed, on restart Flink would continue with 
record 101. But if the sink does not synchronously waits for all updates 
to be persisted the checkpoint may finish, and if then send asynchronous 
update (say for record 99) then Flink will _still_ resume from record 101.


On 05.03.2019 15:07, Gabriel Candal wrote:

Hi,

Recently I've opened a Stack Overflow question 
 about 
latency spikes (~500ms) after a checkpoint operation, even though the 
operation itself was relatively fast (~50ms).


I've come to realize that the cause for the latency was that the job 
was waiting for the RMQSource to acknowledgeSessionIDs during 
notifyCheckpointComplete.


I've noticed that the Kafka connectors do the equivalent operation 
(committing offsets) asynchronously, at least from 09 onwards. My 
question to you is: can you see any reason why does this 
acknowledgement have to synchronous on RabbitMQ?


I believe it should be ok, given that those messages are already 
reflected in the checkpointed state, but I'm not sure if there are any 
negatives consequences correctness-wise.


Thanks,





RE: Checkpoints and catch-up burst (heavy back pressure)

2019-03-06 Thread LINZ, Arnaud
Hi,
I like the idea, will give it a try.
Thanks,
Arnaud

De : Stephen Connolly 
Envoyé : mardi 5 mars 2019 13:55
À : LINZ, Arnaud 
Cc : zhijiang ; user 
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)



On Tue, 5 Mar 2019 at 12:48, Stephen Connolly 
mailto:stephen.alan.conno...@gmail.com>> wrote:


On Fri, 1 Mar 2019 at 13:05, LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>> wrote:
Hi,

I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a 
HDFS directory. DataSet emitted by the source is a data set of file names, not 
file content. These filenames are rebalanced, and sent to workers (parallelism 
= 15) that will use a flatmapper that open the file, read it, decode it, and 
send records (forward mode) to the sinks (with a few 1-to-1 mapping 
in-between). So the flatmap operation is a time-consuming one as the files are 
more than 200Mb large each; the flatmapper will emit millions of record to the 
sink given one source record (filename).

The rebalancing, occurring at the file name level, does not use much I/O and I 
cannot use one-to-one mode at that point if I want some parallelims since I 
have only one source.

I did not put file decoding directly in the sources because I have no good way 
to distribute files to sources without a controller (input directory is unique, 
filenames are random and cannot be “attributed” to one particular source 
instance easily).

Crazy idea: If you know the task number and the number of tasks, you can hash 
the filename using a shared algorithm (e.g. md5 or sha1 or crc32) and then just 
check modulo number of tasks == task number.

That would let you run the list files in parallel without sharing state. which 
would allow file decoding directly in the sources

if you extend RichParallelSourceFunction you will have:

int index = getRuntimeContext().getIndexOfThisSubtask();
int count = getRuntimeContext().getNumberOfParallelSubtasks();

then a hash function like:

private static int hash(String string) {
int result = 0;
for (byte b : DigestUtils.sha1(string)) {
result = result * 31 + b;
}
return result;
}

and just compare the filename like so:

for (String filename: listFiles()) {
  if (Math.floorMod(hash(filename), count) != index) {
continue;
  }
  // this is our file
  ...
}

Note: if you know the file name patterns, you should tune the hash function to 
distribute them evenly. The SHA1 with prime reduction of the bytes is ok for 
general levelling... but may be poor over 15 buckets with your typical data set 
of filenames


Alternatively, I could have used a dispatcher daemon separated from the 
streaming app that distribute files to various directories, each directory 
being associated with a flink source instance, and put the file reading & 
decoding directly in the source, but that seemed more complex to code and 
exploit than the filename source. Would it have been better from the 
checkpointing perspective?

About the ungraceful source sleep(), is there a way, programmatically, to know 
the “load” of the app, or to determine if checkpointing takes too much time, so 
that I can do it only on purpose?

Thanks,
Arnaud

De : zhijiang mailto:wangzhijiang...@aliyun.com>>
Envoyé : vendredi 1 mars 2019 04:59
À : user mailto:user@flink.apache.org>>; LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take 
more time to finish checkpoint in your case. I also experienced some scenarios 
of catching up data to take several hours to finish one checkpoint. If the 
current checkpoint expires because of timeout, the next new triggered 
checkpoint might still be failed for timeout. So it seems better to wait the 
current checkpoint until finishes, not expires it, unless we can not bear this 
long time for some reasons such as wondering failover to restore more data 
during this time.

For option2: The default network setting should be make sense. The lower values 
might cause performance regression and the higher values would increase the 
inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as 
you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you 
used the rebalance mode as mentioned. Another option might use the forward mode 
which would be better than rebalance mode if possible in your case. Because the 
source and downstream task is one-to-one in forward mode, so the total 
flighting buffers are 2+2+8 for one single downstream task before barrier. If 
in rebalance mode, the total flighting 

Re: EXT :Re: Flink 1.7.1 Inaccessible

2019-03-06 Thread Seye Jin
You will have to copy and the link in it's entirety,Gmail not recognizing
correctly
http://mail-archives.apache.org/mod_mbox/flink-user/201709.mbox/<
533686a2-71ee-4356-8961-68cf3f858...@expedia.com>

On Wed, Mar 6, 2019, 5:26 AM Till Rohrmann  wrote:

> Hmm this is strange. Retrieving more information from the logs would be
> helpful to better understand the problem.
>
> The link to the related discussion does not work. Maybe you could repost
> it.
>
> Cheers,
> Till
>
> On Wed, Mar 6, 2019 at 4:32 AM Seye Jin  wrote:
>
>>
>> Hi till, there were no warn or error log messages. We have been using
>> Flink for a long time now and never experienced this issue(we just migrated
>> to 1.7 from 1.4 though).It was a critical app and after multiple tries to
>> try and resolve, we updated the *high-availabilty.cluster-id* and attached
>> the TMs to new JM(even though we sadly lost state)
>>
>> @nick we are indeed running Flink and zookeeper in docker and we verified
>> it could resolve hostname, plus it got a new leader id, it even
>> acknowledged registering the jobs running on the cluster(even though
>> checkpoints were not getting triggered)
>>
>> We are keeping a close eye on this issue and trying to replicate and sift
>> through kibana logs and will post here if we find anything.
>>
>> P.S: it kind of looks similar to this that happened a while back (
>> http://mail-archives.apache.org/mod_mbox/flink-user/201709.mbox/<
>> 533686a2-71ee-4356-8961-68cf3f858...@expedia.com>
>> )
>>
>>
>> On Mon, Mar 4, 2019, 12:38 PM Martin, Nick  wrote:
>>
>>> Seye, are you running Flink and Zookeeper in Docker? I’ve had problems
>>> with Jobmanagers not resolving the hostnames for Zookeeper when starting a
>>> stack on Docker.
>>>
>>>
>>>
>>> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
>>> *Sent:* Monday, March 04, 2019 7:02 AM
>>> *To:* Seye Jin 
>>> *Cc:* user 
>>> *Subject:* EXT :Re: Flink 1.7.1 Inaccessible
>>>
>>>
>>>
>>> Hi Seye,
>>>
>>>
>>>
>>> usually, Flink's web UI should be accessible after a successful leader
>>> election. Could you share with us the cluster logs to see what's going on?
>>> Without this information it is hard to tell what's going wrong.
>>>
>>>
>>>
>>> What you could also do is to check the ZooKeeper znode which represents
>>> the cluster id (if you are using Yarn it should be something like
>>> /flink/application_...). There you could check the contents of the leader
>>> znode of the web ui (leader/rest_server_lock). It should contain the
>>> address of the current leader if there is one.
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Till
>>>
>>>
>>>
>>> On Sat, Mar 2, 2019 at 5:29 AM Seye Jin  wrote:
>>>
>>> I am getting "service temporarily unavailable due to an ongoing leader
>>> election" when I try to access Flink UI. The jobmanager has HA configured,
>>> I have tried to restart jobmanager multiple times but no luck. I also tried
>>> submitting my job from console but I also get the same message.
>>>
>>> When I view logs during JM restart I see no errors, it even says
>>> "jobmanager was granted leadership with ..."
>>>
>>> Any hints to try and remediate this issue will be much appreciated. I
>>> have multiple stateful applications running so I cannot start a new
>>> cluster(since I am unable to do a savepoint also).
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Notice: This e-mail is intended solely for use of the individual or
>>> entity to which it is addressed and may contain information that is
>>> proprietary, privileged and/or exempt from disclosure under applicable law.
>>> If the reader is not the intended recipient or agent responsible for
>>> delivering the message to the intended recipient, you are hereby notified
>>> that any dissemination, distribution or copying of this communication is
>>> strictly prohibited. This communication may also contain data subject to
>>> U.S. export laws. If so, data subject to the International Traffic in Arms
>>> Regulation cannot be disseminated, distributed, transferred, or copied,
>>> whether incorporated or in its original form, to foreign nationals residing
>>> in the U.S. or abroad, absent the express prior approval of the U.S.
>>> Department of State. Data subject to the Export Administration Act may not
>>> be disseminated, distributed, transferred or copied contrary to U. S.
>>> Department of Commerce regulations. If you have received this communication
>>> in error, please notify the sender by reply e-mail and destroy the e-mail
>>> message and any physical copies made of the communication.
>>>  Thank you.
>>> *
>>>
>>> --
>>> Notice: This e-mail is intended solely for use of the individual or
>>> entity to which it is addressed and may contain information that is
>>> proprietary, privileged and/or exempt from disclosure under applicable law.
>>> If the reader is not the intended recipient or agent responsible for
>>> delivering the message to the intended recipient, 

Re: EXT :Re: Flink 1.7.1 Inaccessible

2019-03-06 Thread Till Rohrmann
Hmm this is strange. Retrieving more information from the logs would be
helpful to better understand the problem.

The link to the related discussion does not work. Maybe you could repost it.

Cheers,
Till

On Wed, Mar 6, 2019 at 4:32 AM Seye Jin  wrote:

>
> Hi till, there were no warn or error log messages. We have been using
> Flink for a long time now and never experienced this issue(we just migrated
> to 1.7 from 1.4 though).It was a critical app and after multiple tries to
> try and resolve, we updated the *high-availabilty.cluster-id* and attached
> the TMs to new JM(even though we sadly lost state)
>
> @nick we are indeed running Flink and zookeeper in docker and we verified
> it could resolve hostname, plus it got a new leader id, it even
> acknowledged registering the jobs running on the cluster(even though
> checkpoints were not getting triggered)
>
> We are keeping a close eye on this issue and trying to replicate and sift
> through kibana logs and will post here if we find anything.
>
> P.S: it kind of looks similar to this that happened a while back (
> http://mail-archives.apache.org/mod_mbox/flink-user/201709.mbox/<
> 533686a2-71ee-4356-8961-68cf3f858...@expedia.com>
> )
>
>
> On Mon, Mar 4, 2019, 12:38 PM Martin, Nick  wrote:
>
>> Seye, are you running Flink and Zookeeper in Docker? I’ve had problems
>> with Jobmanagers not resolving the hostnames for Zookeeper when starting a
>> stack on Docker.
>>
>>
>>
>> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
>> *Sent:* Monday, March 04, 2019 7:02 AM
>> *To:* Seye Jin 
>> *Cc:* user 
>> *Subject:* EXT :Re: Flink 1.7.1 Inaccessible
>>
>>
>>
>> Hi Seye,
>>
>>
>>
>> usually, Flink's web UI should be accessible after a successful leader
>> election. Could you share with us the cluster logs to see what's going on?
>> Without this information it is hard to tell what's going wrong.
>>
>>
>>
>> What you could also do is to check the ZooKeeper znode which represents
>> the cluster id (if you are using Yarn it should be something like
>> /flink/application_...). There you could check the contents of the leader
>> znode of the web ui (leader/rest_server_lock). It should contain the
>> address of the current leader if there is one.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Sat, Mar 2, 2019 at 5:29 AM Seye Jin  wrote:
>>
>> I am getting "service temporarily unavailable due to an ongoing leader
>> election" when I try to access Flink UI. The jobmanager has HA configured,
>> I have tried to restart jobmanager multiple times but no luck. I also tried
>> submitting my job from console but I also get the same message.
>>
>> When I view logs during JM restart I see no errors, it even says
>> "jobmanager was granted leadership with ..."
>>
>> Any hints to try and remediate this issue will be much appreciated. I
>> have multiple stateful applications running so I cannot start a new
>> cluster(since I am unable to do a savepoint also).
>>
>> Thanks
>>
>>
>>
>>
>> --
>>
>> Notice: This e-mail is intended solely for use of the individual or
>> entity to which it is addressed and may contain information that is
>> proprietary, privileged and/or exempt from disclosure under applicable law.
>> If the reader is not the intended recipient or agent responsible for
>> delivering the message to the intended recipient, you are hereby notified
>> that any dissemination, distribution or copying of this communication is
>> strictly prohibited. This communication may also contain data subject to
>> U.S. export laws. If so, data subject to the International Traffic in Arms
>> Regulation cannot be disseminated, distributed, transferred, or copied,
>> whether incorporated or in its original form, to foreign nationals residing
>> in the U.S. or abroad, absent the express prior approval of the U.S.
>> Department of State. Data subject to the Export Administration Act may not
>> be disseminated, distributed, transferred or copied contrary to U. S.
>> Department of Commerce regulations. If you have received this communication
>> in error, please notify the sender by reply e-mail and destroy the e-mail
>> message and any physical copies made of the communication.
>>  Thank you.
>> *
>>
>> --
>> Notice: This e-mail is intended solely for use of the individual or
>> entity to which it is addressed and may contain information that is
>> proprietary, privileged and/or exempt from disclosure under applicable law.
>> If the reader is not the intended recipient or agent responsible for
>> delivering the message to the intended recipient, you are hereby notified
>> that any dissemination, distribution or copying of this communication is
>> strictly prohibited. This communication may also contain data subject to
>> U.S. export laws. If so, data subject to the International Traffic in Arms
>> Regulation cannot be disseminated, distributed, transferred, or copied,
>> whether incorporated or in its original form, to 

Re: Task slot sharing: force reallocation

2019-03-06 Thread Till Rohrmann
Which version of Flink are you using?

On Tue, Mar 5, 2019 at 10:58 PM Le Xu  wrote:

> Hi Till:
>
> Thanks for the reply. The setup of the jobs is roughly as follows: For a
> cluster with N machines, we deploy X simple map/reduce style jobs (the job
> DAG and settings are exactly the same, except they consumes different
> data). Each job has N mappers (they are evenly distributed, one mapper on
> each machine).There are X mappers on each machine (as there are X jobs in
> total). Each job has only one reducer where all mappers point to. What I'm
> observing is that all reducers are allocated to machine 1 (where all mapper
> 1 from every job is allocated to).  It does make sense since reducer and
> mapper 1 are in the same slot group. The original purpose of the questions
> is to find out whether it is possible to explicitly specify that reducer
> can be co-located with another mapper (such as mapper 2 so the reducer of
> job 2 can be placed on machine 2). Just trying to figure out if it is all
> possible without using more expensive approach (through YARN for example).
> But if it is not possible I will see if I can move to job mode as Piotr
> suggests.
>
> Thanks,
>
> Le
>
> On Tue, Mar 5, 2019 at 9:24 AM Till Rohrmann  wrote:
>
>> Hard to tell whether this is related to FLINK-11815.
>>
>> To me the setup is not fully clear. Let me try to sum it up: According to
>> Le Xu's description there are n jobs running on a session cluster. I assume
>> that every TaskManager has n slots. The observed behaviour is that every
>> job allocates the slot for the first mapper and chained sink from the first
>> TM, right? Since Flink does not give strict guarantees for the slot
>> allocation this is possible, however it should be highly unlikely or at
>> least change when re-executing the same setup. At the moment there is no
>> functionality in place to control the task-slot assignment.
>>
>> Chaining only affects which task will be grouped together and executed by
>> the same Task (being executed by the same thread). Separate tasks can still
>> be executed in the same slot if they have the same slot sharing group. This
>> means that there can be multiple threads running in each slot.
>>
>> For me it would be helpful to get more information about the actual job
>> deployments.
>>
>> Cheers,
>> Till
>>
>> On Tue, Mar 5, 2019 at 12:00 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi Le,
>>>
>>> As I wrote, you can try running Flink in job mode, which spawns separate
>>> clusters per each job.
>>>
>>> Till, is this issue covered by FLINK-11815
>>>  ? Is this the same
>>> as:
>>>
>>> > Known issues:
>>> > 1. (…)
>>> > 2. if task slots are registered before slot request, the code have a
>>> tendency to group requests together on the same machine because we
>>> are using a LinkedHashMap
>>>
>>> ?
>>>
>>> Piotrek
>>>
>>> On 4 Mar 2019, at 21:08, Le Xu  wrote:
>>>
>>> Thanks Piotr.
>>>
>>> I didn't realize that the email attachment isn't working so the example
>>> I was referring to was this figure from Flink website:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/fig/slot_sharing.svg
>>>
>>> So I try to run multiple jobs concurrently in a cluster -- the jobs are
>>> identical and the DAG looks very similar to the one in the figure. Each
>>> machine holds one map task from each job. I end up with X number of sinks
>>> on machine 1 (X being the number of jobs). I assume this is caused by the
>>> operator chaining (so that all sinks are chained to mapper 1 all end up on
>>> machine 1). But I also tried disabling chaining but I still get the same
>>> result. Some how even when the sink and the map belongs to different
>>> threads they are still placed in the same slot.
>>>
>>> My goal was to see whether it is possible to have sinks evenly
>>> distributed across the cluster (instead of all on machine 1). One way to do
>>> this is to see if it is ok to chained the sink to one of the other mapper
>>> -- the other way is to see if we can change the placement of the mapper
>>> altogether (like placing map 1 of job 2 on machine 2, map 1 of job 3 on
>>> machine 3 so we end up with sinks sit evenly throughout the cluster).
>>>
>>> Thanks.
>>>
>>> Le
>>>
>>> On Mon, Mar 4, 2019 at 6:49 AM Piotr Nowojski 
>>> wrote:
>>>
 Hi,

 Are you asking the question if that’s the behaviour or you have
 actually observed this issue? I’m not entirely sure, but I would guess that
 the Sink tasks would be distributed randomly across the cluster, but maybe
 I’m mixing this issue with resource allocations for Task Managers. Maybe
 Till will know something more about this?

 One thing that might have solve/workaround the issue is to run those
 jobs in the job mode (one cluster per job), not in cluster mode, since
 containers for Task Managers are created/requested randomly.

 Piotrek

 On 2 Mar 2019, at 23:53, Le Xu  wrote:

 Hello!


Re: Setting source vs sink vs window parallelism with data increase

2019-03-06 Thread Piotr Nowojski
Re-adding user mailing list.


Hi,

If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s 
Mission Control) can lead you to the solution. Once you confirm that it’s a GC 
issue, there are numerous resources online how to analyse the cause of the 
problem. For that, it is difficult to use CPU profiling/Flink Metrics, since GC 
issues caused by one thread, can cause performance bottlenecks in other 
unrelated places.

If that’s not a GC issue, you can use Flink metrics (like number of buffered 
input/output data) to find Task that’s causing a bottleneck. Then you can use 
CPU profiler to analyse why is that happening.

Piotrek

> On 6 Mar 2019, at 02:52, Padarn Wilson  wrote:
> 
> Hi Piotr,
> 
> Thanks for your feedback. Makes sense about the checkpoint barriers - this 
> definitely could be the cause of a problem.
> 
> I would advice profiling your job to find out what’s going on.
> 
> Agreed. Outside of inspecting the Flink metrics, do you have suggestions for 
> tools with which to do this?
> 
> The main thing I'm trying to pin down is:
> 1) Is it the downstream processing from the expansion of records that causes 
> a problem, or 
> 2) Is is the shuffle of all the records after the expansion which is taking a 
> large time - if so, is there anything I can do to mitigate this other than 
> trying to ensure less shuffle.
> 
> Thanks,
> Padarn
> 
> 
> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski  > wrote:
> Hi,
> 
>> Do you mind elaborating on this? What technology would you propose as an 
>> alternative, and why would this increase checkpointing time? 
> 
> The problem is that when Flink starts checkpoint and inject checkpoint 
> barriers, those checkpoint barriers travel through the Job Graph. The quicker 
> they can do that the better. How fast does it take depends on the amount of 
> buffered data before checkpoint barriers (currently all of such records must 
> be processed before checkpoint barrier is passed down stream). The more 
> buffered records and the more time it takes to process those records, the 
> longer the checkpoint take time. Obviously if one stage in the job is 
> multiplying the amount of records, it can in a way multiply the amount of 
> “buffered work” that needs to be processed before checkpoint barriers pass 
> through.
> 
> However it might not be the case for you. To analyse what’s going on you 
> would need to look at various Flink metrics, like checkpoint times, back 
> pressured tasks, state of the output/input buffers of the tasks, etc. However 
> #2, those are secondary issues. First of all you should try to pin point the 
> cause of long GC pauses. If it comes from your code, you should fix this 
> first. If that either isn’t the issue or doesn’t solve it, generally speaking 
> I would advice profiling your job to find out what’s going on.
> 
> Piotrek
> 
>> On 5 Mar 2019, at 02:00, Padarn Wilson > > wrote:
>> 
>> Hi Piotr,
>> 
>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 1.7.2 
>> shortly - there is some S3 fix I'd like to take advantage of).
>> 
>> Generally speaking Flink might not the best if you have records fan out, 
>> this may significantly increase checkpointing time. 
>> 
>> Do you mind elaborating on this? What technology would you propose as an 
>> alternative, and why would this increase checkpointing time? 
>> 
>> However you might want to first identify what’s causing long GC times.
>> 
>> My current plan is to try and enable GC logs and see if I can get something 
>> meaningful from them. 
>> 
>> Thanks a lot,
>> Padarn 
>> 
>> 
>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski > > wrote:
>> Hi,
>> 
>> What Flink version are you using?
>> 
>> Generally speaking Flink might not the best if you have records fan out, 
>> this may significantly increase checkpointing time. 
>> 
>> However you might want to first identify what’s causing long GC times. If 
>> there are long GC pause, this should be the first thing to fix.
>> 
>> Piotrek
>> 
>>> On 2 Mar 2019, at 08:19, Padarn Wilson >> > wrote:
>>> 
>>> Hi all again - following up on this I think I've identified my problem as 
>>> being something else, but would appreciate if anyone can offer advice.
>>> 
>>> After running my stream from sometime, I see that my garbage collector for 
>>> old generation starts to take a very long time:
>>> 
>>> here the purple line is young generation time, this is ever increasing, but 
>>> grows slowly, while the blue is old generation.
>>> This in itself is not a problem, but as soon as the next checkpoint is 
>>> triggered after this happens you see the following:
>>> 
>>> It looks like the checkpoint hits a cap, but this is only because the 
>>> checkpoints start to timeout and fail (these are the alignment time per 
>>> operator)
>>> 
>>> I do notice that my state is growing quite larger over time, but I don't 
>>> 

Re: Flink 在什么情况下产生乱序问题?

2019-03-06 Thread Congxian Qiu
hi
对于 kafka 来说,单 partition 内的消息可以保证顺序,但是 partition A 和 partition B 之间的消息顺序是没法保证的。

Best, Congxian
On Mar 5, 2019, 18:35 +0800, 刘 文 , wrote:
> 请教一下,大家说的Flink 乱序问题,是什么情况下产生,我没明白?
> ).谁给我一下会产生乱序问题的场景吗?
> ).以下是读取kafka中的数据,三个并行度
> ).输出的结果如下:(总数据20条)
>
> 3> Message_3
> 1> Message_1
> 2> Message_2
> 1> Message_4
> 2> Message_5
> 3> Message_6
> 2> Message_8
> 1> Message_7
> 2> Message_11
> 3> Message_9
> 2> Message_14
> 1> Message_10
> 2> Message_17
> 3> Message_12
> 2> Message_20
> 1> Message_13
> 3> Message_15
> 1> Message_16
> 3> Message_18
> 1> Message_19


How to check validity or completeness of created checkpoint/savepoint

2019-03-06 Thread Parth Sarathy
Hi,
  I am running flink 1.7.2 and working on resuming a job from a retained
checkpoint / savepoint. 
I want to enquire if there is any reliable method which can be used to know
the validity or completeness of the checkpoint / savepoint created by flink.

Thanks,
Parth Sarathy



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Problems with restoring from savepoint

2019-03-06 Thread Pavel Potseluev
Hi! We use flink-1.7.1 and have some problems with restoring from savepoint. We use custom kryo serializer which relies on protobuf representation of our model classes. It had been working fine but when we made some change in our model class it broke because of changed serialVersionUID. We can see this message in the log:   Caused by: java.lang.IllegalStateException: Could not Java-deserialize TypeSerializer while restoring checkpoint metadata for serializer snapshot 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'. Please update to the TypeSerializerSnapshot interface that removes Java Serialization to avoid this problem in the future. I found that method snapshotConfiguration of org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer returns instance of KryoSerializerConfigSnapshot. And this class for some reason extends deprecated TypeSerializerConfigSnapshot which relies on java serialization.  Of course we have fixed our problem just by adding special serialVersionUID to our class. But it seems strange to have problems with java serialization while our serializer doesn't use this mechanism. Do you plan to fix this problem? Full stack trace below: java.lang.Exception: Exception while creating StreamOperatorStateContext.
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for StreamGroupedReduce_5d41c2bc0b6f18591a40bd21a3e516cd_(2/2) from any of the 1 provided restore options.
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
	... 5 more
Caused by: java.lang.IllegalStateException: Could not Java-deserialize TypeSerializer while restoring checkpoint metadata for serializer snapshot 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'. Please update to the TypeSerializerSnapshot interface that removes Java Serialization to avoid this problem in the future.
	at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.restoreSerializer(TypeSerializerConfigSnapshot.java:138)
	at org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:212)
	at org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:188)
	at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:135)
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.getStateSerializer(CopyOnWriteStateTable.java:541)
	at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.createV2PlusReader(StateTableByKeyGroupReaders.java:69)
	at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.readerForVersion(StateTableByKeyGroupReaders.java:60)
	at org.apache.flink.runtime.state.heap.StateTable.keyGroupReader(StateTable.java:199)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:491)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358)
	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
	... 7 more
Caused by: java.io.InvalidClassException: ru.yandex.vertis.moderation.model.ModerationRequest$UpsertMetadata; local class incompatible: stream classdesc serialVersionUID = -30736003445323259, local class serialVersionUID = -2856495280913794838
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:687)
	at 

Re: submit job failed on Yarn HA

2019-03-06 Thread 孙森
Hi Gary:
  Yes, it’s the second case, the client host  is different from the 
session cluster got started. I’ve tried the way by using" flink run -yid  “, it 
really works.

Best!
Sen

> 在 2019年3月6日,下午3:19,Gary Yao  写道:
> 
> Hi Sen,
> 
> I took a look at your CLI logs again, and saw that it uses the "default" Flink
> namespace in ZooKeeper:
> 
> 2019-02-28 11:18:05,255 INFO  
> org.apache.flink.runtime.util.ZooKeeperUtils  - Using 
> '/flink/default' as Zookeeper namespace.
> 
> However, since you are using YARN, the Flink namespace in ZooKeeper should
> include the YARN applicationId. Normally, the CLI tries to resolve the
> applicationId from a local "YARN properties" file [1], which is generated
> after a successful submission of a session cluster (using Flink's bin/yarn-
> session.sh) [2]. In your case that file does not exist – maybe because it got
> deleted, or the host from which you are submitting the job, is a different one
> from which the session cluster got started.
> 
> If you submit the job with -yid , or --yarnapplicationId
> , the CLI should use the correct namespace in ZooKeeper.
> Just submit the job normally without removing the ZooKeeper configuration from
> flink-conf.yaml, and without specifying host:port manually with the "-m"
> option. Let me know if this works for you.
> 
> Best,
> Gary
> 
> [1] 
> https://github.com/apache/flink/blob/e2579e39602ab7d3e906a185353dd413aca58317/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L236
>  
> 
> 
> [2] 
> https://github.com/apache/flink/blob/e2579e39602ab7d3e906a185353dd413aca58317/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L622-L625
>  
> 
> 
> [3] 
> https://github.com/apache/flink/blob/e2579e39602ab7d3e906a185353dd413aca58317/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java#L603-L606
>  
> 
> 
> On Wed, Mar 6, 2019 at 3:58 AM 孙森  > wrote:
> Hi Gary:
>
>   Thanks very much! I have tried it as the way you said. It works. 
> Hopes that the bug can be fixed as soon as possible.
> Best!
> Sen
> 
>> 在 2019年3月5日,下午3:15,Gary Yao mailto:g...@ververica.com>> 
>> 写道:
>> 
>> Hi Sen,
>> 
>> In that email I meant that you should disable the ZooKeeper configuration in
>> the CLI because the CLI had troubles resolving the leader from ZooKeeper. 
>> What
>> you should have done is:
>> 
>> 1. Start the cluster normally with ZooKeeper enabled
>> 2. Edit flink-conf.yaml to remove ZooKeeper config
>> 3. Submit the job to your cluster with -m flag.
>> 
>> Best,
>> Gary
>> 
>> On Tue, Mar 5, 2019 at 8:08 AM 孙森 > > wrote:
>> Hi Gary:
>> 
>>   No zookeeper is because the reason that the job submit will fail.
>> <屏幕快照 2019-03-05 下午3.07.21.png>
>> 
>> 
>> Best
>> Sen
>> 
>>> 在 2019年3月5日,下午3:02,Gary Yao >> > 写道:
>>> 
>>> Hi Sen,
>>> 
>>> I don't see
>>> 
>>> high-availability: zookeeper
>>> 
>>> in your Flink configuration. However, this is mandatory for an HA setup. By
>>> default "none" is used, and the ZK configuration is ignored. The log also
>>> hints that you are using StandaloneLeaderElectionService instead of the
>>> ZooKeeper implementation (note that the leaderSessionID consists only of 0s
>>> [1][2]):
>>> 
>>> 2019-03-05 11:23:53,883 INFO  
>>> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- 
>>> http://hdp3:60179  was granted leadership with 
>>> leaderSessionID=----
>>> 
>>> Did you accidentally delete the "high-availability" config from your flink-
>>> conf.yaml?
>>> 
>>> You probably also want to increase the number of yarn.application-attempts
>>> [3].
>>> 
>>> Best,
>>> Gary
>>> 
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/dcd8c74b504046802cebf278b718e4753928a260/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java#L48
>>>  
>>> 
>>> [2] 
>>> https://github.com/apache/flink/blob/dcd8c74b504046802cebf278b718e4753928a260/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java#L52-L57
>>>  
>>>