Re: Need help using AggregateFunction instead of FoldFunction

2019-12-08 Thread vino yang
Hi dev,

The time of the window may have different semantics.
In the session window, it's only a time gap, the size of the window is
driven via activity events.
In the tumbling or sliding window, it means the size of the window.

For more details, please see the official documentation.[1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#session-windows



devinbost  于2019年12月6日周五 下午10:39写道:

> I think there might be a bug in
> `.window(EventTimeSessionWindows.withGap(Time.seconds(5)))`
>  (unless I'm just not using it correctly) because I'm able to get output
> when I use the simpler window
> `.timeWindow(Time.seconds(5))`
> However, I don't get any output when I used the session-based window.
>
>
> devinbost wrote
> > I added logging statements everywhere in my code, and I'm able to see my
> > message reach the `add` method in the AggregateFunction that I
> > implemented,
> > but the getResult method is never called.
> >
> > In the code below, I also never see the:
> >  "Ran dataStream. Adding sink next"
> > line appear in my log, and the only log statements from the
> > JsonConcatenator
> > class come from the `add` method, as shown below.
> >
> >
> > DataStream
> > 
> >  combinedEnvelopes = dataStream
> > .map(new MapFunctionString, Tuple2lt;String, String>() {
> > @Override
> > public Tuple2 map(String incomingMessage) throws Exception {
> > return mapToTuple(incomingMessage);
> > }
> > })
> > .keyBy(0)
> > .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
> > .aggregate(new JsonConcatenator());
> >
> > Logger logger = LoggerFactory.getLogger(StreamJob.class);
> > logger.info("Ran dataStream. Adding sink next")
> >
> > -
> >
> > private static class JsonConcatenator
> > implements AggregateFunctionTuple2lt;String, String,
> > Tuple2String, String, String> {
> > Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
> > @Override
> > public Tuple2String, String createAccumulator() {
> > return new Tuple2String, String("","");
> > }
> >
> > @Override
> > public Tuple2String, String add(Tuple2String, String
> > value,
> > Tuple2String, String accumulator) {
> > logger.info("Running Add on value.f0: " + value.f0 + " and
> > value.f1:
> > " + value.f1);
> > return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
> > }
> >
> > @Override
> > public String getResult(Tuple2String, String accumulator) {
> > logger.info("Running getResult on accumulator.f1: " +
> > accumulator.f1);
> > return "[" + accumulator.f1 + "]";
> > }
> >
> > @Override
> > public Tuple2String, String merge(Tuple2String,
> String
> > a,
> > Tuple2String, String b) {
> > logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " +
> > a.f1
> > + " and b.f1: " + b.f1);
> > return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
> > }
> > }
> >
> >
> >
> >
> > Any ideas?
> >
> >
> > Chris Miller-2 wrote
> >> I hit the same problem, as far as I can tell it should be fixed in
> >> Pulsar 2.4.2. The release of this has already passed voting so I hope
> it
> >> should be available in a day or two.
> >>
> >> https://github.com/apache/pulsar/pull/5068
> >
> >
> >
> >
> >
> > --
> > Sent from:
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?

2019-12-08 Thread JingsongLee
Hi 猫猫:

在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1]
你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。

[2] 中有使用的完整例子,FYI。

[1] https://issues.apache.org/jira/browse/FLINK-14320
[2] 
https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala

Best,
Jingsong Lee


--
From:猫猫 <16770...@qq.com>
Send Time:2019年12月6日(星期五) 17:52
To:user-zh 
Subject:[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime?

我使用tableEnv.sqlUpdate(ddl);方式创建表


但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。
请问在flink中是否支持使用该种方式创建流表,并开窗?


我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。


sql创表语句如下:
CREATE TABLE T_UserBehavior(
   userId BIGINT,
   itemId BIGINT,
   categoryId BIGINT,
   behavior VARCHAR,
   optime BIGINT
) WITH (
  'connector.type' = 'filesystem',   -- required: specify to 
connector type
  'connector.path' = 
'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv',
  -- required: path to a file or directory
  'format.type' = 'csv',
  'format.fields.0.name' = 'userId', -- required: define the schema 
either by using type information
  'format.fields.0.type' = 'BIGINT',
  'format.fields.1.name' = 'itemId',
  'format.fields.1.type' = 'BIGINT',
  'format.fields.2.name' = 'categoryId',
  'format.fields.2.type' = 'BIGINT',
  'format.fields.3.name' = 'behavior',
  'format.fields.3.type' = 'VARCHAR',
  'format.fields.4.name' = 'optime',
  'format.fields.4.type' = 'BIGINT'
);

Re: Flink实时数仓落Hive一般用哪种方式好?

2019-12-08 Thread JingsongLee
Hi 帅,
- 目前可以通过改写StreamingFileSink的方式来支持Parquet。
(但是目前StreamingFileSink支持ORC比较难)
- BulkWriter和批处理没有关系,它只是StreamingFileSink的一种概念。
- 如果sync hive分区,这需要自定义了,目前StreamingFileSink没有现成的。

在1.11中,Table层会持续深入这方面的处理,实时数仓落hive,在后续会一一解决数据倾斜、分区可见性等问题。[1]

[1] https://issues.apache.org/jira/browse/FLINK-14249

Best,
Jingsong Lee


--
From:陈帅 
Send Time:2019年12月8日(星期日) 10:04
To:user-zh@flink.apache.org 
Subject:Flink实时数仓落Hive一般用哪种方式好?

有人说直接写到HBase,再在Hive关联Hbase表
但是我想直接写文件到HDFS,再通过Hive外表加载,不过有几个问题不明白:

1. 如果用行式格式实时写没有问题,StreamingFileSink也支持,但是如果我想用列式格式(如Parquet或ORC)
写的话,目前来看没有现成的Streaming
Writer,官方提供的都是
BulkWriter,只支持批处理方式。是不是不建议用列式格式进行流式写?如果能的话需要自己定义StreamingWriter吗?对于
业务上的Update和Delete操作 数据一般是如何sync进Hive的?

2. 写入HDFS文件后,要怎么让Hive自动发现新加的分区?能在Flink ETL程序中一并完成吗?


Re: Flink RetractStream如何转成AppendStream?

2019-12-08 Thread JingsongLee
+1 to lucas.wu

Best,
Jingsong Lee


--
From:lucas.wu 
Send Time:2019年12月9日(星期一) 11:39
To:user-zh 
Subject:Re: Flink RetractStream如何转成AppendStream?

可以使用类似的方式
//   val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2)
//   val result5 = tEnv.fromDataStream(sstream)
//   result5.toAppendStream[Row].print()


原始邮件
发件人:Jark wuimj...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月8日(周日) 11:53
主题:Re: Flink RetractStream如何转成AppendStream?


Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 
RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 
2019 at 10:08, 陈帅 casel.c...@gmail.com wrote:  在用Flink做实时数仓时遇到group 
by统计后需要将结果发到kafka,但是现在的kafka   
sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?

Re: Flink RetractStream如何转成AppendStream?

2019-12-08 Thread JingsongLee
Hi 帅,

你可以先把RetractStream转成DataStream,这样就出现了Tuple的stream,然后你再写个MapFunc过滤,最后通过DataStream写入Kafka中。

Best,
Jingsong Lee


--
From:Jark Wu 
Send Time:2019年12月8日(星期日) 11:54
To:user-zh 
Subject:Re: Flink RetractStream如何转成AppendStream?

Hi,

目前 Kafka  只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持
RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。

Best,
Jark

On Sun, 8 Dec 2019 at 10:08, 陈帅  wrote:

> 在用Flink做实时数仓时遇到group by统计后需要将结果发到kafka,但是现在的kafka
>
> sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?
>


Re: [DISCUSS] Adding e2e tests for Flink's Mesos integration

2019-12-08 Thread Yang Wang
Thanks Yangze for starting this discussion.

Just share my thoughts.

If the mesos official docker image could not meet our requirement, i
suggest to build the image locally.
We have done the same things for yarn e2e tests. This way is more flexible
and easy to maintain. However,
i have no idea how long building the mesos image locally will take. Based
on previous experience of yarn, i
think it may not take too much time.



Best,
Yang

Yangze Guo  于2019年12月7日周六 下午4:25写道:

> Thanks for your feedback!
>
> @Till
> Regarding the time overhead, I think it mainly come from the network
> transmission. For building the image locally, it will totally download
> 260MB files including the base image and packages. For pulling from
> DockerHub, the compressed size of the image is 347MB. Thus, I agree
> that it is ok to build the image locally.
>
> @Piyush
> Thank you for offering the help and sharing your usage scenario. In
> current stage, I think it will be really helpful if you can compress
> the custom image[1] or reduce the time overhead to build it locally.
> Any ideas for improving test coverage will also be appreciated.
>
> [1]
> https://hub.docker.com/layers/karmagyz/mesos-flink/latest/images/sha256-4e1caefea107818aa11374d6ac8a6e889922c81806f5cd791ead141f18ec7e64
>
> Best,
> Yangze Guo
>
> On Sat, Dec 7, 2019 at 3:17 AM Piyush Narang  wrote:
> >
> > +1 from our end as well. At Criteo, we are running some Flink jobs on
> Mesos in production to compute short term features for machine learning.
> We’d love to help out and contribute on this initiative.
> >
> > Thanks,
> > -- Piyush
> >
> >
> > From: Till Rohrmann 
> > Date: Friday, December 6, 2019 at 8:10 AM
> > To: dev 
> > Cc: user 
> > Subject: Re: [DISCUSS] Adding e2e tests for Flink's Mesos integration
> >
> > Big +1 for adding a fully working e2e test for Flink's Mesos
> integration. Ideally we would have it ready for the 1.10 release. The lack
> of such a test has bitten us already multiple times.
> >
> > In general I would prefer to use the official image if possible since it
> frees us from maintaining our own custom image. Since Java 9 is no longer
> officially supported as we opted for supporting Java 11 (LTS) it might not
> be feasible, though. How much longer would building the custom image vs.
> downloading the custom image from DockerHub be? Maybe it is ok to build the
> image locally. Then we would not have to maintain the image.
> >
> > Cheers,
> > Till
> >
> > On Fri, Dec 6, 2019 at 11:05 AM Yangze Guo  karma...@gmail.com>> wrote:
> > Hi, all,
> >
> > Currently, there is no end to end test or IT case for Mesos deployment
> > while the common deployment related developing would inevitably touch
> > the logic of this component. Thus, some work needs to be done to
> > guarantee experience for both Meos users and contributors. After
> > offline discussion with Till and Xintong, we have some basic ideas and
> > would like to start a discussion thread on adding end to end tests for
> > Flink's Mesos integration.
> >
> > As a first step, we would like to keep the scope of this contribution
> > to be relative small. This may also help us to quickly get some basic
> > test cases that might be helpful for the upcoming 1.10 release.
> >
> > As far as we can think of, what needs to be done is to setup a Mesos
> > framework during the testing and determine which tests need to be
> > included.
> >
> >
> > ** Regarding the Mesos framework, after trying out several approaches,
> > I find that setting up Mesos in docker is probably what we want. The
> > resources needed for building and setting up Mesos from source is
> > probably not affordable in most of the scenarios. So, the one open
> > question that worth discussion is the choice of Docker image. We have
> > come up with two options.
> >
> > - Using official Mesos image[1]
> > The official image was the first alternative that come to our mind,
> > but we run into some sort of Java version compatibility problem that
> > leads to failures of launching task executors. Flink supports Java 9
> > since version 1.9.0 [2], However, the official Docker image of Mesos
> > is built with a development version of JDK 9, which probably has
> > caused this problem. Unless we want to make Flink to also be
> > compatible with the JDK development version used by the official mesos
> > image, this option does not work out. Besides, according to the
> > official roadmap[5], Java 9 is not a long-term support version, which
> > may bring stability risk in future.
> >
> > - Build a custom image
> > I've already tried build a custom image[3] and successfully run most
> > of the existing end to end tests cases with it. The image is built
> > with Ubuntu 16.04, JDK 8 and Mesos 1.7.1. For the mesos e2e test
> > framework, we could either build the image from a Docker file or pull
> > the pre-built image from DockerHub (or other hub services) during the
> > testing.
> > If we decide to publish the an image on DockerHub, we 

Re: KeyBy/Rebalance overhead?

2019-12-08 Thread vino yang
Hi Komal,

KeyBy(Hash Partition, logically partition) and rebalance(physical
partition) are both one of the partitions been supported by Flink.[1]

Generally speaking, partitioning may cause network communication(network
shuffles) costs which may cause more time cost. The example provided by you
may be benefit from operator chain[2] if you remove the keyBy operation.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#datastream-transformations
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#tasks-and-operator-chains

Komal Mariam  于2019年12月9日周一 上午9:11写道:

> Anyone?
>
> On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:
>
>> Hello everyone,
>>
>> I want to get some insights on the KeyBy (and Rebalance) operations as
>> according to my understanding they partition our tasks over the defined
>> parallelism and thus should make our pipeline faster.
>>
>> I am reading a topic which contains 170,000,000 pre-stored records with
>> 11 Kafka partitions and replication factor of 1.   Hence I use
>> .setStartFromEarliest() to read the stream.
>> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores
>> and 1 job manager with 6 cores. (10 task slots per TM hence I set
>> environment parallelism to 30).
>>
>> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
>> keeping the number of records fixed to get a handle on how fast they're
>> being processed.
>>
>> When I remove keyBy, I get the same results in 39 secs as opposed to 52
>> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
>> below I still get the same extra overhead of 9 to 13secs. My data is mostly
>> uniformly distributed on it's key so I can rule out skew.  Rebalance
>> likewise has the same latency as keyBy.
>>
>>  What I want to know is what may be causing this overhead? And is there
>> any way to decrease it?
>>
>> Here's the script I'm running for testing purposes:
>> --
>> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
>> new
>> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>>
>> DataStream myPoints = JSONStream.map(new jsonToPoint());
>>
>> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>>
>> public class findDistancefromPOI extends RichFilterFunction {
>> public boolean filter(Point input) throws Exception {
>> Double distance = computeEuclideanDist(
>> 16.4199  , 89.974  ,input.X(),input.Y);
>>  return distance > 0;
>> }
>> }
>>
>> Best Regards,
>> Komal
>>
>


flink on k8s 如何指定用户程序的入口

2019-12-08 Thread aven . wu
各位好!
关于flink on k8s
看了官网的文档之后Dockerfile,docker-entrypoint.sh,job-cluster-job.yaml.template等文件有以下问题:
1 standalone 
启动jobmanager之后是如何知道用户程序的主入口(要执行的main方法时哪个?)如果是通过Maven打包时候设置的,那么如何不在打包时不设置,而通过命令行传入
 类似 on yarn 模式下的 -c
2 如果是在 standalone-job.sh 时指定用户程序的主入口,那么如何传入用户自定义参数(在用户主程序args[]中接收)?

发送自 Windows 10 版邮件应用



Sample Code for querying Flink's default metrics

2019-12-08 Thread Pankaj Chand
Hello,

Using Flink on Yarn, I could not understand the documentation for how to
read the default metrics via code. In particular, I want to read
throughput, i.e. CPU usage, Task/Operator's numRecordsOutPerSecond, and
Memory.

Is there any sample code for how to read such default metrics?  Is there
any way to query the default metrics, such as CPU usage and Memory, without
using REST API or Reporters?

Additionally, how do I query Backpressure using code, or is it still only
visually available via the dashboard UI? Consequently, is there any way to
infer Backpressure by querying one (or more) of the Memory metrics of the
TaskManager?

Thank you,

Pankaj


Re: Flink RetractStream如何转成AppendStream?

2019-12-08 Thread lucas.wu
可以使用类似的方式
//   val sstream = result4.toRetractStream[Row],filter(_.1==trye).map(_._2)
//   val result5 = tEnv.fromDataStream(sstream)
//   result5.toAppendStream[Row].print()


原始邮件
发件人:Jark wuimj...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2019年12月8日(周日) 11:53
主题:Re: Flink RetractStream如何转成AppendStream?


Hi, 目前 Kafka 只支持 append mode,所以无法消费 retract stream。在 Flink 1.11,社区将计划支持 
RetractStream 转成 AppendStream 的操作,以 changelog 的方式输出。 Best, Jark On Sun, 8 Dec 
2019 at 10:08, 陈帅 casel.c...@gmail.com wrote:  在用Flink做实时数仓时遇到group 
by统计后需要将结果发到kafka,但是现在的kafka   
sink是不支持RetractStream,因此会报错,有没有什么方法将RetractStream转成AppendStream(例如,忽略撤回的消息,只发送更新的消息),从而可以将结果发到kafka?

Re: User program failures cause JobManager to be shutdown

2019-12-08 Thread Dongwon Kim
Hi Robert and Roman,
Yeah, letting users know System.exit() is called would be much more
appropriate than just intercepting and ignoring.

Best,
Dongwon

On Sat, Dec 7, 2019 at 11:29 PM Robert Metzger  wrote:

> I guess we could manage the security only when calling the user's main()
> method.
>
> This problem actually exists for all usercode in Flink: You can also kill
> TaskManagers like this.
> If we are going to add something like this to Flink, I would only log that
> System.exit() has been called by the user code, not intercept and ignore
> the call.
>
> On Fri, Dec 6, 2019 at 10:31 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> This should work but it could also interfere with Flink itself exiting in
>> case of a fatal error.
>>
>> Regards,
>> Roman
>>
>>
>> On Fri, Dec 6, 2019 at 2:54 AM Dongwon Kim  wrote:
>>
>>> FYI, we've launched a session cluster where multiple jobs are managed by
>>> a job manager. If that happens, all the other jobs also fail because the
>>> job manager is shut down and all the task managers get into chaos (failing
>>> to connect to the job manager).
>>>
>>> I just searched a way to prevent System.exit() calls from terminating
>>> JVMs and found [1]. Can it be a possible solution to the problem?
>>>
>>> [1]
>>> https://stackoverflow.com/questions/5549720/how-to-prevent-calls-to-system-exit-from-terminating-the-jvm
>>>
>>> Best,
>>> - Dongwon
>>>
>>> On Fri, Dec 6, 2019 at 10:39 AM Dongwon Kim 
>>> wrote:
>>>
 Hi Robert and Roman,

 Thank you for taking a look at this.

 what is your main() method / client doing when it's receiving wrong
> program parameters? Does it call System.exit(), or something like that?
>

 I just found that our HTTP client is programmed to call System.exit(1).
 I should guide not to call System.exit() in Flink applications.

 p.s. Just out of curiosity, is there no way for the web app to
 intercept System.exit() and prevent the job manager from being shutting
 down?

 Best,

 - Dongwon

 On Fri, Dec 6, 2019 at 3:59 AM Robert Metzger 
 wrote:

> Hi Dongwon,
>
> what is your main() method / client doing when it's receiving wrong
> program parameters? Does it call System.exit(), or something like that?
>
> By the way, the http address from the error message is
> publicly available. Not sure if this is internal data or not.
>
> On Thu, Dec 5, 2019 at 6:32 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> I wasn't able to reproduce your problem with Flink JobManager 1.9.1
>> with various kinds of errors in the job.
>> I suggest you try it on a fresh Flink installation without any other
>> jobs submitted.
>>
>> Regards,
>> Roman
>>
>>
>> On Thu, Dec 5, 2019 at 3:48 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Roman,
>>>
>>> We're using the latest version 1.9.1 and those two lines are all
>>> I've seen after executing the job on the web ui.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Thu, Dec 5, 2019 at 11:36 PM r_khachatryan <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi Dongwon,

 Could you please provide Flink version you are running and the job
 manager
 logs?

 Regards,
 Roman


 eastcirclek wrote
 > Hi,
 >
 > I tried to run a program by uploading a jar on Flink UI. When I
 > intentionally enter a wrong parameter to my program, JobManager
 dies.
 > Below
 > is all log messages I can get from JobManager; JobManager dies as
 soon as
 > spitting the second line:
 >
 > 2019-12-05 04:47:58,623 WARN
 >>  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
 >> Configuring the job submission via query parameters is
 deprecated. Please
 >> migrate to submitting a JSON request instead.
 >>
 >>
 >> *2019-12-05 04:47:59,133 ERROR com.skt.apm.http.HTTPClient
 >>   - Cannot
 >> connect:
 http://52.141.38.11:8380/api/spec/poc_asset_model_01/model/imbalance/models
 >> 
 http://52.141.38.11:8380/api/spec/poc_asset_model_01/model/imbalance/models
 ;:
 >> com.fasterxml.jackson.databind.exc.MismatchedInputException:
 Cannot
 >> deserialize instance of `java.util.ArrayList` out of
 START_OBJECT token
 >> at
 >> [Source:
 >>
 (String)“{”code”:“GB0001”,“resource”:“msg.comm.unknown.error”,“details”:“NullPointerException:
 >> “}”; line: 1, column: 1]2019-12-05 04:47:59,166 INFO
 >>  org.apache.flink.runtime.blob.BlobServer  -
 Stopped
 >> BLOB server at 0.0.0.0:6124 

Re: KeyBy/Rebalance overhead?

2019-12-08 Thread Komal Mariam
Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam  wrote:

> Hello everyone,
>
> I want to get some insights on the KeyBy (and Rebalance) operations as
> according to my understanding they partition our tasks over the defined
> parallelism and thus should make our pipeline faster.
>
> I am reading a topic which contains 170,000,000 pre-stored records with 11
> Kafka partitions and replication factor of 1.   Hence I use
> .setStartFromEarliest() to read the stream.
> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and
> 1 job manager with 6 cores. (10 task slots per TM hence I set environment
> parallelism to 30).
>
> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
> keeping the number of records fixed to get a handle on how fast they're
> being processed.
>
> When I remove keyBy, I get the same results in 39 secs as opposed to 52
> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
> below I still get the same extra overhead of 9 to 13secs. My data is mostly
> uniformly distributed on it's key so I can rule out skew.  Rebalance
> likewise has the same latency as keyBy.
>
>  What I want to know is what may be causing this overhead? And is there
> any way to decrease it?
>
> Here's the script I'm running for testing purposes:
> --
> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
> new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>
> DataStream myPoints = JSONStream.map(new jsonToPoint());
>
> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>
> public class findDistancefromPOI extends RichFilterFunction {
> public boolean filter(Point input) throws Exception {
> Double distance = computeEuclideanDist(
> 16.4199  , 89.974  ,input.X(),input.Y);
>  return distance > 0;
> }
> }
>
> Best Regards,
> Komal
>


[ANNOUNCE] Weekly Community Update 2019/49

2019-12-08 Thread Konstantin Knauf
Dear community,

happy to share this week's community digest with an update on Flink 1.8.3,
a revival of the n-ary stream operator, a proposal to move our build
infrastructure to Azure pipelines, and quite a few other topics. Enjoy.

Flink Development
==

* [releases] The feature freeze for *Flink 1.10* is tonight.

* [releases] Hequn has published and started a vote on RC3 for *Flink 1.8.3
*Voting is open until Dec. 10th 2019, 16:00 UTC. No votes so far, but I
assume this will change after the feature freeze. [1]

* [runtime] Piotr has restarted the discussion to add an *n-ary stream
operator* which would help to support multi-broadcast joins in Flink SQL
(think of a star schema). The topic has been discussed before (in 2016) in
the context of side-inputs and there is already an old design document
drafted by Aljoscha to build on top. [2]

* [connectors] Chesnay started a conversation to drop support for *Kafka
0.8/0.9* connectors in the upcoming release. It seems that quite a few
people are in favor of dropping, but Becket also made a valid point to only
deprecate these connector instead of removing them all togehter. [3]

* [connectors] Becket has started the vote on *FLIP-27, the new source
interface.* This has been a long ongoing topic, but it has not been
officially been voted on so far. So there we go. [4,5]

* [state backends] Stephan has proposed to drop support for the *synchronous
mode of the heap statebackend.* One supporting comment so far. [6]

* [hadoop] Craig Foster brought up the topic of *Hadoop 3* support in
Apache Flink*.* There is currently no one working on this topic, but Marton
Balassi of Cloudera reported that they have been working on this internally
and would be willing to contribute their work back next year. [7]

* [development process] There is currently no *end-to-end test for Flink
deployments on Mesos*. Yangze suggests to such tests now and has started a
discussion thread on the topic. He is now looking for feedback from Mesos
production users in order to improve the planned test suite. It seems the
community will need to maintain their own Mesos docker images for these
tests due to Java compatibility issues with the official Mesos docker
images. [8]

* [development process] Dawid has asked comitters to only vote with
their *apache.org
 email* addresses and to only count these votes as
binding going forward. Only for apache.org addresses it is possible to
verify the status of the voter. [9]

* [development process] Following the discussion on reducing the build time
of Apache Flink, Robert and others did some experiments to migrate our
build from *Travis CI to Azure Pipelines. *In this thread [10] and wiki
page [11] he presents his results and asks for opinions on how to move
forward. So far there has been a lot of positive feedback to migrate to
Azure pipelines mainly motivated by lower build times (due to additional
sponsored machines) and richer features.

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-3-release-candidate-3-tp35628.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-tp11341p35554.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-Kafka-0-8-0-9-tp35553.html
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-27-Refactor-Source-Interface-tp35569.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-Heap-Backend-Synchronous-snapshots-tp35621.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Re-Building-with-Hadoop-3-tp35522p35528.html
[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-e2e-tests-for-Flink-s-Mesos-integration-tp35660p35687.html
[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Voting-from-apache-org-addresses-tp35499.html
[10]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Migrate-build-infrastructure-from-Travis-CI-to-Azure-Pipelines-tp35538.html
[11]
https://cwiki.apache.org/confluence/display/FLINK/%5Bpreview%5D+Azure+Pipelines

Notable Bugs
==

* [FLINK-15063] [1.9.1] The scope of the input/output metrics of the
network stack are interchanged, e.g. the outPoolUsage metric can be found
under task level scope of "shuffle.netty.input" instead of
"shuffle.netty.output". Fixed for 1.9.2. [12]

* [FLINK-14949] [1.9.1] [1.8.2] A job can get stuck during cancellation,
e.g. if Flink can not spawn the threads, which perform exactly this
cancellation. Fixed for 1.9.2 [13]

[12] https://issues.apache.org/jira/browse/FLINK-15063
[13] https://issues.apache.org/jira/browse/FLINK-14949

Events, Blog Posts, Misc
===

* *Markos* and *Yuan* have published a recap of Flink Forward Asia 2019 on
the Ververica blog including a short 

Re: Emit intermediate accumulator state of a session window

2019-12-08 Thread Rafi Aroch
Hi Chandu,

Maybe you can use a custom trigger:
* .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))*

This would continuously trigger your aggregate every period of time.

Thanks,
Rafi


On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin  wrote:

> Hi Chandu,
>
> I am not sure whether using the windowing API is helpful in this case at
> all.
>
> At least, you could try to consume the data not only by windowing but also
> by a custom stateful function.
> You look into the AggregatingState [1]. Then you could do whatever you
> want with the current aggregated value.
> If you still need to do something with the result of windowing, you could
> do it as now or simulate it with timers [2] in that same stateful function.
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example
>
> On Tue, Dec 3, 2019 at 12:21 AM chandu soa  wrote:
>
>> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
>> session window when new event arrives*
>>
>>
>>
>> AggregateFunction#getResults() is called only when window completes. My
>> need is emit intermediate accumulator values(result of
>> AggregateFunction#add()) as well and write them to Sink. Both
>> AggregateFunction#getResult() and ProcessWindowFunction() provides
>> aggregated result, only when the window is closed.
>>
>> *Any thoughts please, how to emit or stream intermediate accumulator
>> state as soon as new event arrive when window is open? Need to implement
>> custom trigger or Assigner?*
>>
>>
>>
>> To give you some background, when user watches a video we get events -
>> when clicked, thereafter every ~ 15minutes, and finally when user close the
>> video.
>>
>> I need to aggregate them as soon as they arrive and post it to
>> destination. For example, if user watching a two-hour movie I get events
>> for 15 min interval(0,15,30,...,120), whenever I get a event need to
>> aggregate watched percentage so far and write it to sink(0%, 12.5%,
>> 25%,...,100%). The below implementation emitting(getResult()) a single
>> event 20 minutes after watching a video.
>>
>>
>>
>>
>>
>> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>>
>>
>> .aggregate(new EventAggregator())
>>
>>
>> .filter(new FinalFilter())
>>
>>
>> .addSink(...)
>>
>>
>> Appreciate your help.
>>
>>
>> Thanks,
>>
>> chandu
>>
>


Change Flink binding address in local mode

2019-12-08 Thread Andrea Cardaci
Hi,

Flink (or some of its services) listens on three random TCP ports
during the local[1] execution, e.g., 39951, 41009 and 42849.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/local_execution.html#local-environment

The sockets listens on `0.0.0.0` and since I need to run some
long-running tests on an Internet-facing machine I was wondering how
to make them listen on `localhost` instead or if there is anything
else I can do to improve the security in this scenario.

Here's what I tried (with little luck):

> Configuration config = new Configuration();
> config.setString("taskmanager.host", "127.0.0.1");
> config.setString("rest.bind-address", "127.0.0.1"); // OK
> config.setString("jobmanager.rpc.address", "127.0.0.1");
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment(StreamExecutionEnvironment.getDefaultLocalParallelism(),
>  config);

Only the `rest.bind-address` configuration actually changes the
binding address of one of those ports. Are there other parameters that
I'm not aware of or this is not the right approach in local mode?


Best,
Andrea


unsubscribe

2019-12-08 Thread Deepak Sharma